diff --git a/build.gradle b/build.gradle index 29311694bb..13e9c1abf3 100644 --- a/build.gradle +++ b/build.gradle @@ -518,6 +518,27 @@ project(":samza-yarn_$scalaSuffix") { jar.dependsOn("lesscss") } +project(":samza-kubernetes_$scalaSuffix") { + apply plugin: 'java' + + dependencies { + compile project(':samza-api') + compile project(":samza-core_$scalaSuffix") + compile "org.codehaus.jackson:jackson-core-asl:1.9.7" + compile group: 'io.fabric8', name: 'kubernetes-client', version: kubernetesJavaClientVersion + testCompile "junit:junit:$junitVersion" + testCompile "org.mockito:mockito-core:$mockitoVersion" + } + + tasks.create(name: "releaseKubeTarGz", dependsOn: configurations.archives.artifacts, type: Tar) { + into "samza-kubernetes-${version}" + compression = Compression.GZIP + from(configurations.runtime) { into("lib/") } + from(configurations.archives.artifacts.files) { into("lib/") } + duplicatesStrategy 'exclude' + } +} + project(":samza-shell") { apply plugin: 'java' diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index e05b837298..b022b00d0f 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -55,4 +55,5 @@ jnaVersion = "4.5.1" couchbaseClientVersion = "2.7.2" couchbaseMockVersion = "1.5.22" + kubernetesJavaClientVersion = "4.1.3" } diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index 143e0b3d79..fc89858737 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -179,7 +179,11 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri hostAffinityEnabled, jobConfig.getStandbyTasksEnabled(), localityManager, faultDomainManager, config); this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager); - this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + if (shouldStartAllocateThread()) { + this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + } else { + this.allocatorThread = null; + } this.restartContainers = restartContainers; LOG.info("Finished container process manager initialization."); } @@ -211,15 +215,23 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri LOG.info("Finished container process manager initialization"); } + // In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a + // separate thread to keep polling the allocated resources to start the container. + public boolean shouldStartAllocateThread() { + return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager"); + } + public boolean shouldShutdown() { LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}", - state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no"); + state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread != null && allocatorThread.isAlive() ? "yes" : "no"); if (exceptionOccurred != null) { LOG.error("Exception in container process manager", exceptionOccurred); throw new SamzaException(exceptionOccurred); } - return jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get() || !allocatorThread.isAlive(); + + boolean shouldShutdown = jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get(); + return allocatorThread == null ? shouldShutdown : (shouldShutdown || !allocatorThread.isAlive()); } public void start() { @@ -278,7 +290,9 @@ public void start() { // Start container allocator thread LOG.info("Starting the container allocator thread"); - allocatorThread.start(); + if (allocatorThread != null) { + allocatorThread.start(); + } LOG.info("Starting the container process manager"); } @@ -287,12 +301,14 @@ public void stop() { // Shutdown allocator thread containerAllocator.stop(); - try { - allocatorThread.join(); - LOG.info("Stopped container allocator"); - } catch (InterruptedException ie) { - LOG.error("Allocator thread join threw an interrupted exception", ie); - Thread.currentThread().interrupt(); + if (allocatorThread != null) { + try { + allocatorThread.join(); + LOG.info("Stopped container allocator"); + } catch (InterruptedException ie) { + LOG.error("Allocator thread join threw an interrupted exception", ie); + Thread.currentThread().interrupt(); + } } if (diagnosticsManager.isDefined()) { diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala index 384972262e..a2009b6150 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala @@ -125,7 +125,7 @@ class HttpServer( } /** - * Returns the URL for the root of the HTTP server. This method + * Returns the URL for the root of the HTTP server. This URL is generated with host name. */ def getUrl = { if (running) { @@ -136,4 +136,17 @@ class HttpServer( throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.") } } + + /** + * Returns the URL for the root of the HTTP server. This URL is generated with host address. + */ + def getIpUrl = { + if (running) { + val runningPort = server.getConnectors()(0).asInstanceOf[NetworkConnector].getLocalPort() + + new URL("http://" + Util.getLocalHost.getHostAddress + ":" + runningPort + rootPath) + } else { + throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.") + } + } } diff --git a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala index 577bba63da..3761826db3 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala @@ -54,7 +54,7 @@ object HttpUtil extends Logging { (exception, loop) => { exception match { case ioe: IOException => { - warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass) + error("Error getting response from Job coordinator server. Received IOException: %s. Retrying..." format ioe) httpConn = getHttpConnection(url, timeout) } case e: Exception => diff --git a/samza-kubernetes/src/docker/dockerfiles/Dockerfile b/samza-kubernetes/src/docker/dockerfiles/Dockerfile new file mode 100644 index 0000000000..98b1229a51 --- /dev/null +++ b/samza-kubernetes/src/docker/dockerfiles/Dockerfile @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# samzaJarsFolder includes all the Samza jars (you needs to make sure all the samza jars are there.) +# You can build Samza image by: +# docker build -t dockerHubAccount/samza:versionNumber . +# Then Samza user can use the Samza image as base image to build their application image. +# + +FROM ubuntu:latest + +RUN apt-get update -y && apt-get upgrade -y && apt-get install -y openjdk-8-jdk + +ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64 +ENV PATH $PATH:$JAVA_HOME/bin + +RUN mkdir -p /opt/samza +WORKDIR /opt/samza/ +COPY samzaJarsFolder/ /opt/samza/ diff --git a/samza-kubernetes/src/main/java/org/apache/samza/README.md b/samza-kubernetes/src/main/java/org/apache/samza/README.md new file mode 100644 index 0000000000..0d87a29e8d --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/README.md @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +# Configurations +- kube.app.image: the image name for the samza app +- kube.app.namespace: the namespace where the samza app runs +- kube.app.pod.mnt.path: the path where the remote volume is mounted into the pod, for both the job coordinator pod and stream processor pod + the volume can be used for storing logs and local states. +- cluster-manager.container.memory.mb: the memory size for the samza stream processor +- cluster-manager.container.cpu.cores: the cpu cores for the samza stream processor diff --git a/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java new file mode 100644 index 0000000000..c5d3121ddd --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +/** + * Kubernetes related configurations + */ +public class KubeConfig { + + // the image name of samza + public static final String APP_IMAGE = "kube.app.image"; + public static final String DEFAULT_IMAGE = "samza/samza:v0"; + + // the default working directory + public static final String DEFAULT_DIRECTORY = "/opt/samza/"; + + // the memory and the cpu cores of container + public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE = 1024; + public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = 1; + + // The directory path inside which the log will be stored. + public static final String SAMZA_MOUNT_DIR = "kube.app.pod.mnt.path"; + public static final String DEFAULT_SAMZA_MOUNT_DIR = "/tmp/mnt"; + public static final String K8S_API_NAMESPACE = "kube.app.namespace"; + public static final String STREAM_PROCESSOR_CONTAINER_NAME_PREFIX = "sp"; + public static final String JC_CONTAINER_NAME_PREFIX = "jc"; + public static final String POD_RESTART_POLICY = "OnFailure"; + public static final String JC_POD_NAME_FORMAT = "%s-%s-%s"; // jc-appName-appId + public static final String TASK_POD_NAME_FORMAT = "%s-%s-%s-%s"; // sp-appName-appId-containerId + + // Environment variable + public static final String COORDINATOR_POD_NAME = "COORDINATOR_POD_NAME"; + public static final String AZURE_REMOTE_VOLUME_ENABLED = "kube.app.volume.azure.file-share.enabled"; + public static final String AZURE_SECRET = "kube.app.volume.azure-secret"; + public static final String DEFAULT_AZURE_SECRET = "azure-secret"; + public static final String AZURE_FILESHARE = "kube.app.volume.azure.file-share"; + public static final String DEFAULT_AZURE_FILESHARE = "aksshare"; + + private Config config; + + public KubeConfig(Config config) { + this.config = config; + } + + public static KubeConfig validate(Config config) throws ConfigException { + KubeConfig kc = new KubeConfig(config); + kc.validate(); + return kc; + } + + // TODO: SAMZA-2365: validate KubeConfig before starting the job + private void validate() throws ConfigException { + + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java new file mode 100644 index 0000000000..0c1d2b1b83 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A KubeClientFactory returns a default KubernetesClient + */ +public class KubeClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(KubeClientFactory.class); + + public static KubernetesClient create() { + ConfigBuilder builder = new ConfigBuilder(); + Config config = builder.build(); + KubernetesClient client = new DefaultKubernetesClient(config); + LOG.info("Kubernetes client created. "); + return client; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java new file mode 100644 index 0000000000..329be961d7 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; +import java.net.URL; +import java.util.*; +import org.apache.samza.clustermanager.ClusterResourceManager; +import org.apache.samza.clustermanager.ResourceRequestState; +import org.apache.samza.clustermanager.SamzaApplicationState; +import org.apache.samza.clustermanager.SamzaResourceRequest; +import org.apache.samza.clustermanager.SamzaResource; +import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.job.CommandBuilder; +import org.apache.samza.job.ShellCommandBuilder; +import org.apache.samza.util.ReflectionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samza.config.ApplicationConfig.*; +import static org.apache.samza.config.KubeConfig.*; + +/** + * An {@link KubeClusterResourceManager} implements a ClusterResourceManager using Kubernetes as the underlying + * resource manager. + */ +public class KubeClusterResourceManager extends ClusterResourceManager { + private static final Logger LOG = LoggerFactory.getLogger(KubeClusterResourceManager.class); + + private final Map podLabels = new HashMap<>(); + private final KubernetesClient client; + private final String appId; + private final String appName; + private final String image; + private final String namespace; + private final JobModelManager jobModelManager; + private final Config config; + private boolean hostAffinityEnabled; + private String jcPodName; + private OwnerReference ownerReference; + + KubeClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback) { + super(callback); + this.config = config; + this.client = KubeClientFactory.create(); + this.jobModelManager = jobModelManager; + this.image = config.get(APP_IMAGE, DEFAULT_IMAGE); + this.namespace = config.get(K8S_API_NAMESPACE, "default"); + this.appId = config.get(APP_ID, "001"); + this.appName = config.get(APP_NAME, "samza"); + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled(); + createOwnerReferences(); + } + + @Override + public void start() { + LOG.info("Kubernetes Cluster ResourceManager started, starting watcher"); + startPodWatcher(); + jobModelManager.start(); + } + + // Create the owner reference for the samza-job-coordinator pod + private void createOwnerReferences() { + this.jcPodName = System.getenv(COORDINATOR_POD_NAME); + LOG.info("job coordinator pod name is: {}, namespace is: {}", jcPodName, namespace); + Pod pod = client.pods().inNamespace(namespace).withName(jcPodName).get(); + ownerReference = new OwnerReferenceBuilder() + .withName(pod.getMetadata().getName()) + .withApiVersion(pod.getApiVersion()) + .withUid(pod.getMetadata().getUid()) + .withKind(pod.getKind()) + .withController(true).build(); + podLabels.put("jc-pod-name", jcPodName); + } + + public void startPodWatcher() { + Watcher watcher = new Watcher() { + @Override + public void eventReceived(Action action, Pod pod) { + if (!pod.getMetadata().getLabels().get("jc-pod-name").equals(jcPodName)) { + LOG.warn("This JC pod name is " + jcPodName + ", received pods for a different JC " + + pod.getMetadata().getLabels().get("jc-pod-name")); + return; + } + LOG.info("Pod watcher received action " + action + " for pod " + pod.getMetadata().getName()); + switch (action) { + case ADDED: + LOG.info("Pod " + pod.getMetadata().getName() + " is added."); + break; + case MODIFIED: + LOG.info("Pod " + pod.getMetadata().getName() + " is modified."); + if (isPodFailed(pod)) { + deletePod(pod); + } + break; + case ERROR: + LOG.info("Pod " + pod.getMetadata().getName() + " received error."); + if (isPodFailed(pod)) { + deletePod(pod); + } + break; + case DELETED: + LOG.info("Pod " + pod.getMetadata().getName() + " is deleted."); + createNewStreamProcessor(pod); + break; + } + } + @Override + public void onClose(KubernetesClientException e) { + LOG.error("Pod watcher closed", e); + } + }; + + // TODO: SAMZA-2367: "podLabels" is empty. Need to add labels when creating Pod + client.pods().withLabels(podLabels).watch(watcher); + } + + private boolean isPodFailed(Pod pod) { + return pod.getStatus() != null && pod.getStatus().getPhase().equals("Failed"); + } + + private void deletePod(Pod pod) { + boolean deleted = client.pods().delete(pod); + if (deleted) { + LOG.info("Deleted pod " + pod.getMetadata().getName()); + } else { + LOG.info("Failed to deleted pod " + pod.getMetadata().getName()); + } + } + private void createNewStreamProcessor(Pod pod) { + int memory = Integer.parseInt(pod.getSpec().getContainers().get(0).getResources().getRequests().get("memory").getAmount()); + int cpu = Integer.parseInt(pod.getSpec().getContainers().get(0).getResources().getRequests().get("cpu").getAmount()); + + String containerId = KubeUtils.getSamzaContainerNameFromPodName(pod.getMetadata().getName()); + + // Find out previously running container location + // TODO: SAMZA-2629: need to get the locality information. The logic below only works for samza 1.3 or earlier version. + /* String lastSeenOn = jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); + if (!hostAffinityEnabled || lastSeenOn == null) { + lastSeenOn = ResourceRequestState.ANY_HOST; + } */ + String lastSeenOn = ResourceRequestState.ANY_HOST; + SamzaResourceRequest request = new SamzaResourceRequest(cpu, memory, lastSeenOn, containerId); + requestResources(request); + } + + @Override + public void requestResources(SamzaResourceRequest resourceRequest) { + String samzaContainerId = resourceRequest.getProcessorId(); + LOG.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + samzaContainerId); + CommandBuilder builder = getCommandBuilder(samzaContainerId); + String command = buildCmd(builder); + LOG.info("Container ID {} using command {}", samzaContainerId, command); + Container container = KubeUtils.createContainer(STREAM_PROCESSOR_CONTAINER_NAME_PREFIX, image, resourceRequest, command); + container.setEnv(getEnvs(builder)); + String podName = String.format(TASK_POD_NAME_FORMAT, STREAM_PROCESSOR_CONTAINER_NAME_PREFIX, appName, appId, samzaContainerId); + + PodBuilder podBuilder; + if (config.getBoolean(AZURE_REMOTE_VOLUME_ENABLED)) { + AzureFileVolumeSource azureFileVolumeSource = new AzureFileVolumeSource(false, + config.get(AZURE_SECRET, DEFAULT_AZURE_SECRET), config.get(AZURE_FILESHARE, DEFAULT_AZURE_FILESHARE)); + Volume volume = new Volume(); + volume.setAzureFile(azureFileVolumeSource); + volume.setName("azure"); + VolumeMount volumeMount = new VolumeMount(); + volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, DEFAULT_SAMZA_MOUNT_DIR)); + volumeMount.setName("azure"); + volumeMount.setSubPath(podName); + LOG.info("Set subpath to " + podName + ", mountpath to " + config.get(SAMZA_MOUNT_DIR, DEFAULT_SAMZA_MOUNT_DIR)); + container.setVolumeMounts(Collections.singletonList(volumeMount)); + podBuilder = new PodBuilder().editOrNewMetadata() + .withName(podName) + .withOwnerReferences(ownerReference) + .addToLabels(podLabels).endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .withVolumes(volume).addToContainers(container).endSpec(); + } else { + podBuilder = new PodBuilder().editOrNewMetadata() + .withName(podName) + .withOwnerReferences(ownerReference) + .addToLabels(podLabels).endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .addToContainers(container).endSpec(); + } + + String preferredHost = resourceRequest.getPreferredHost(); + Pod pod; + if (preferredHost.equals("ANY_HOST")) { + // Create a pod with only one container in anywhere + pod = podBuilder.build(); + } else { + LOG.info("Making a preferred host request on " + preferredHost); + pod = podBuilder.editOrNewSpec().editOrNewAffinity().editOrNewNodeAffinity() + .addNewPreferredDuringSchedulingIgnoredDuringExecution().withNewPreference() + .addNewMatchExpression() + .withKey("kubernetes.io/hostname") + .withOperator("Equal") + .withValues(preferredHost).endMatchExpression() + .endPreference().endPreferredDuringSchedulingIgnoredDuringExecution().endNodeAffinity().endAffinity().endSpec().build(); + } + client.pods().inNamespace(namespace).create(pod); + LOG.info("Created a pod " + pod.getMetadata().getName() + " on " + preferredHost); + } + + @Override + public void cancelResourceRequest(SamzaResourceRequest request) { + // no need to implement + } + + @Override + public void releaseResources(SamzaResource resource) { + // no need to implement + } + + @Override + public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) { + // no need to implement + } + + @Override + public void stopStreamProcessor(SamzaResource resource) { + client.pods().withName(resource.getContainerId()).delete(); + } + + @Override + public void stop(SamzaApplicationState.SamzaAppStatus status) { + LOG.info("Kubernetes Cluster ResourceManager stopped"); + jobModelManager.stop(); + } + + private String buildCmd(CommandBuilder cmdBuilder) { + cmdBuilder.setCommandPath(DEFAULT_DIRECTORY); + return cmdBuilder.buildCommand(); + } + + private CommandBuilder getCommandBuilder(String containerId) { + TaskConfig taskConfig = new TaskConfig(config); + String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName()); + CommandBuilder cmdBuilder = ReflectionUtil.getObj(cmdBuilderClassName, CommandBuilder.class); + if (jobModelManager.server() == null) { + LOG.error("HttpServer is null"); + } + URL url = jobModelManager.server().getIpUrl(); + LOG.info("HttpServer URL: " + url); + cmdBuilder.setConfig(config).setId(containerId).setUrl(url); + + return cmdBuilder; + } + + // Construct the envs for the task container pod + private List getEnvs(CommandBuilder cmdBuilder) { + // for logging + StringBuilder sb = new StringBuilder(); + + List envList = new ArrayList<>(); + for (Map.Entry entry : cmdBuilder.buildEnvironment().entrySet()) { + envList.add(new EnvVar(entry.getKey(), entry.getValue(), null)); + sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue())); //logging + } + + // TODO: SAMZA-2366: make the container ID as an execution environment and pass it to the container. + // Seems there is no such id (K8s container id)? + // envList.add(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), container.getId().toString()); + // sb.append(String.format("\n%s=%s", ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), container.getId().toString())); + + envList.add(new EnvVar("LOGGED_STORE_BASE_DIR", config.get(SAMZA_MOUNT_DIR), null)); + envList.add(new EnvVar("EXECUTION_PLAN_DIR", config.get(SAMZA_MOUNT_DIR), null)); + envList.add(new EnvVar("SAMZA_LOG_DIR", config.get(SAMZA_MOUNT_DIR), null)); + + LOG.info("Using environment variables: {}", cmdBuilder, sb.toString()); + + return envList; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java new file mode 100644 index 0000000000..39566aaf16 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.samza.SamzaException; +import org.apache.samza.clustermanager.ResourceRequestState; +import org.apache.samza.clustermanager.SamzaResourceRequest; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.job.StreamJob; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.apache.samza.util.CoordinatorStreamUtil; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.mutable.StringBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.samza.config.ClusterManagerConfig.CLUSTER_MANAGER_MEMORY_MB; +import static org.apache.samza.config.ClusterManagerConfig.CLUSTER_MANAGER_MAX_CORES; +import static org.apache.samza.config.KubeConfig.*; +import static org.apache.samza.job.ApplicationStatus.*; + +/** + * The client to start a Kubernetes job coordinator + */ +public class KubeJob implements StreamJob { + private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class); + + private final ApplicationConfig config; + private final KubernetesClient kubernetesClient; + private final String podName; + private final String nameSpace; + private final KubePodStatusWatcher watcher; + private final String image; + private ApplicationStatus currentStatus; + + public KubeJob(Config configs) { + this.kubernetesClient = KubeClientFactory.create(); + this.config = new ApplicationConfig(configs); + this.podName = String.format(JC_POD_NAME_FORMAT, JC_CONTAINER_NAME_PREFIX, config.getAppName(), config.getAppId()); + this.currentStatus = ApplicationStatus.New; + this.watcher = new KubePodStatusWatcher(podName); + this.nameSpace = config.get(K8S_API_NAMESPACE, "default"); + this.image = config.get(APP_IMAGE, DEFAULT_IMAGE); + } + + /** + * submit the kubernetes job coordinator + */ + public KubeJob submit() { + // create SamzaResourceRequest + int memoryMB = config.getInt(CLUSTER_MANAGER_MEMORY_MB, DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE); + int numCores = config.getInt(CLUSTER_MANAGER_MAX_CORES, DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM); + String preferredHost = ResourceRequestState.ANY_HOST; + SamzaResourceRequest request = new SamzaResourceRequest(numCores, memoryMB, preferredHost, podName); + + // create Container + // TODO: SAMZA-2368: Figure out "samza.fwk.path" and "samza.fwk.version" are still needed in Samza 1.3 + String fwkPath = config.get("samza.fwk.path", ""); + String fwkVersion = config.get("samza.fwk.version"); + String cmd = buildJobCoordinatorCmd(fwkPath, fwkVersion); + LOG.info(String.format("samza.fwk.path: %s. samza.fwk.version: %s. Command: %s", fwkPath, fwkVersion, cmd)); + Container container = KubeUtils.createContainer(JC_CONTAINER_NAME_PREFIX, image, request, cmd); + container.setEnv(getEnvs()); + + PodBuilder podBuilder; + if (config.getBoolean(AZURE_REMOTE_VOLUME_ENABLED)) { + AzureFileVolumeSource azureFileVolumeSource = new AzureFileVolumeSource(false, + config.get(AZURE_SECRET, DEFAULT_AZURE_SECRET), config.get(AZURE_FILESHARE, DEFAULT_AZURE_FILESHARE)); + Volume volume = new Volume(); + volume.setAzureFile(azureFileVolumeSource); + volume.setName("azure"); + VolumeMount volumeMount = new VolumeMount(); + volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, DEFAULT_SAMZA_MOUNT_DIR)); + volumeMount.setName("azure"); + volumeMount.setSubPath(podName); + container.setVolumeMounts(Collections.singletonList(volumeMount)); + podBuilder = new PodBuilder() + .editOrNewMetadata() + .withNamespace(nameSpace) + .withName(podName) + .endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .withContainers(container) + .withVolumes(volume) + .endSpec(); + } else { + // create Pod + podBuilder = new PodBuilder() + .editOrNewMetadata() + .withNamespace(nameSpace) + .withName(podName) + .endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .withContainers(container) + .endSpec(); + } + + Pod pod = podBuilder.build(); + kubernetesClient.pods().create(pod); + // TODO: SAMZA-2247: the watcher here makes Client hung (always waiting). Although it doesn't affect the operator + // and worker containers, we still need to fix this issue. + kubernetesClient.pods().withName(podName).watch(watcher); + return this; + } + + /** + * Kill the job coordinator pod + */ + public KubeJob kill() { + LOG.info("Killing application: {}, operator pod: {}, namespace: {}", config.getAppName(), podName, nameSpace); + kubernetesClient.pods().inNamespace(nameSpace).withName(podName).delete(); + return this; + } + + /** + * Wait for finish without timeout + */ + public ApplicationStatus waitForFinish(long timeoutMs) { + watcher.waitForCompleted(timeoutMs, TimeUnit.MILLISECONDS); + return getStatus(); + } + + /** + * Wait for the application to reach a status + */ + public ApplicationStatus waitForStatus(ApplicationStatus status, long timeoutMs) { + switch (status.getStatusCode()) { + case New: + watcher.waitForPending(timeoutMs, TimeUnit.MILLISECONDS); + return New; + case Running: + watcher.waitForRunning(timeoutMs, TimeUnit.MILLISECONDS); + return Running; + case SuccessfulFinish: + watcher.waitForSucceeded(timeoutMs, TimeUnit.MILLISECONDS); + return SuccessfulFinish; + case UnsuccessfulFinish: + watcher.waitForFailed(timeoutMs, TimeUnit.MILLISECONDS); + return UnsuccessfulFinish; + default: + throw new SamzaException("Unsupported application status type: " + status); + } + } + + /** + * Get teh Status of the job coordinator pod + */ + public ApplicationStatus getStatus() { + Pod operatorPod = kubernetesClient.pods().inNamespace(nameSpace).withName(podName).get(); + PodStatus podStatus = operatorPod.getStatus(); + // TODO + switch (podStatus.getPhase()) { + case "Pending": + currentStatus = ApplicationStatus.New; + break; + case "Running": + currentStatus = Running; + break; + case "Completed": + case "Succeeded": + currentStatus = ApplicationStatus.SuccessfulFinish; + break; + case "Failed": + String err = new StringBuilder().append("Reason: ").append(podStatus.getReason()) + .append("Conditions: ").append(podStatus.getConditions().toString()).toString(); + currentStatus = ApplicationStatus.unsuccessfulFinish(new SamzaException(err)); + break; + case "CrashLoopBackOff": + case "Unknown": + default: + currentStatus = ApplicationStatus.New; + } + return currentStatus; + } + + // Build the job coordinator command + private String buildJobCoordinatorCmd(String fwkPath, String fwkVersion) { + // figure out if we have framework is deployed into a separate location + if (fwkVersion == null || fwkVersion.isEmpty()) { + fwkVersion = "STABLE"; + } + LOG.info(String.format("KubeJob: fwk_path is %s, ver is %s use it directly ", fwkPath, fwkVersion)); + + // default location + String cmdExec = DEFAULT_DIRECTORY + "bin/run-jc.sh"; + if (!fwkPath.isEmpty()) { + // if we have framework installed as a separate package - use it + cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh"; + } + return cmdExec; + } + + // Construct the envs for the job coordinator pod + private List getEnvs() { + MapConfig coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config); + LOG.info("Coordinator system config: {}", coordinatorSystemConfig); + List envList = new ArrayList<>(); + String coordinatorSysConfig; + try { + coordinatorSysConfig = SamzaObjectMapper.getObjectMapper().writeValueAsString(coordinatorSystemConfig); + } catch (IOException ex) { + LOG.warn("No coordinator system configs!", ex); + coordinatorSysConfig = ""; + } + envList.add(new EnvVar("SAMZA_COORDINATOR_SYSTEM_CONFIG", Util.envVarEscape(coordinatorSysConfig), null)); + envList.add(new EnvVar("SAMZA_LOG_DIR", config.get(SAMZA_MOUNT_DIR), null)); + envList.add(new EnvVar(COORDINATOR_POD_NAME, podName, null)); + envList.add(new EnvVar("JAVA_OPTS", "", null)); + return envList; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java new file mode 100644 index 0000000000..52c44b74c1 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import org.apache.samza.config.Config; +import org.apache.samza.job.StreamJobFactory; + +/** + * A KubeJobFactory returns an implementation of a StreamJob for a Samza application running on Kubernetes. + */ +public class KubeJobFactory implements StreamJobFactory { + + @Override + public KubeJob getJob(Config config) { + return new KubeJob(config); + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java new file mode 100644 index 0000000000..f01117a3f1 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A monitor for the running Kubernetes pod of a Samza application + */ + +// TODO: SAMZA-2369: Add a logging thread which is similar to LoggingPodStatusWatcher in Spark +public class KubePodStatusWatcher implements Watcher { + private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class); + private Optional pod = Optional.empty(); + private String phase = "unknown"; + private String appId; + private CountDownLatch podRunningLatch = new CountDownLatch(1); + private CountDownLatch podPendingLatch = new CountDownLatch(1); + private CountDownLatch podSucceededLatch = new CountDownLatch(1); + private CountDownLatch podFailedLatch = new CountDownLatch(1); + private CountDownLatch podCompletedLatch = new CountDownLatch(1); + + public KubePodStatusWatcher(String appId) { + this.appId = appId; + } + + @Override + public void eventReceived(Action action, Pod pod) { + this.pod = Optional.of(pod); + switch (action) { + case DELETED: + case ERROR : + closeAllWatch(); + break; + default: + if (isFailed()) { + closeWatchWhenFailed(); + } else if (isSucceeded()) { + closeWatchWhenSucceed(); + } else if (isRunning()) { + closeWatchWhenRunning(); + } else if (isPending()) { + closeWatchWhenPending(); + } + } + } + + @Override + public void onClose(KubernetesClientException e) { + LOG.info("Stopping watching application {} with last-observed phase {}", appId, phase); + closeAllWatch(); + } + + public void waitForCompleted(long timeout, TimeUnit unit) { + try { + podCompletedLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForCompleted() was interrupted by exception: ", e); + } + } + + public void waitForSucceeded(long timeout, TimeUnit unit) { + try { + podSucceededLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForCompleted() was interrupted by exception: ", e); + } + } + + public void waitForFailed(long timeout, TimeUnit unit) { + try { + podFailedLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForCompleted() was interrupted by exception: ", e); + } + } + + public void waitForRunning(long timeout, TimeUnit unit) { + try { + podRunningLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForRunning() was interrupted by exception: ", e); + } + } + + public void waitForPending(long timeout, TimeUnit unit) { + try { + podPendingLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForPending() was interrupted by exception: ", e); + } + } + + private boolean isSucceeded() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Succeeded"; + } + + private boolean isFailed() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Failed"; + } + + private boolean isRunning() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Running"; + } + + private boolean isPending() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Pending"; + } + + private void closeWatchWhenRunning() { + podRunningLatch.countDown(); + } + + private void closeWatchWhenPending() { + podPendingLatch.countDown(); + } + + + private void closeWatchWhenFailed() { + podFailedLatch.countDown(); + } + + private void closeWatchWhenSucceed() { + podSucceededLatch.countDown(); + } + + private void closeAllWatch() { + closeWatchWhenFailed(); + closeWatchWhenSucceed(); + closeWatchWhenPending(); + closeWatchWhenRunning(); + closeWatchWhenFailed(); + closeWatchWhenSucceed(); + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java new file mode 100644 index 0000000000..0351ade057 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.kubernetes; + +import org.apache.samza.clustermanager.ClusterResourceManager; +import org.apache.samza.clustermanager.ResourceManagerFactory; +import org.apache.samza.clustermanager.SamzaApplicationState; +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.JobModelManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A KubeResourceManagerFactory returns an implementation of a {@link ClusterResourceManager} for Kubernetes. + */ +public class KubeResourceManagerFactory implements ResourceManagerFactory { + private static Logger log = LoggerFactory.getLogger(KubeResourceManagerFactory.class); + + @Override + public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) { + JobModelManager jobModelManager = state.jobModelManager; + Config config = jobModelManager.jobModel().getConfig(); + KubeClusterResourceManager manager = new KubeClusterResourceManager(config, jobModelManager, callback); + log.info("KubeClusterResourceManager created"); + return manager; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java new file mode 100644 index 0000000000..666aadea1d --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.*; +import org.apache.samza.clustermanager.SamzaResourceRequest; + +/** + * Convenient utility class with static methods. + */ +public class KubeUtils { + + public static String getSamzaContainerNameFromPodName(String podName) { + // stream-processor-appName-appId-containerId + String[] splits = podName.split("-"); + return splits[splits.length - 1]; + } + + public static Pod createPod(String name, OwnerReference ownerReference, String restartPolicy, Container container) { + return new PodBuilder().editOrNewMetadata().withName(name).withOwnerReferences(ownerReference).endMetadata() + .editOrNewSpec().withRestartPolicy(restartPolicy).addToContainers(container).endSpec().build(); + } + + public static Pod createPod(String name, String restartPolicy, Container container, String namespace) { + return new PodBuilder().editOrNewMetadata().withNamespace(namespace).withName(name).endMetadata() + .editOrNewSpec().withRestartPolicy(restartPolicy).addToContainers(container).endSpec().build(); + } + + // for Samza operator + public static Container createContainer(String containerId, String image, SamzaResourceRequest resourceRequest, + String cmd) { + Quantity memQuantity = new QuantityBuilder(false) + .withAmount(String.valueOf(resourceRequest.getMemoryMB())).withFormat("Mi").build(); + Quantity cpuQuantity = new QuantityBuilder(false) + .withAmount(String.valueOf(resourceRequest.getNumCores())).build(); + return new ContainerBuilder().withName(containerId).withImage(image).withImagePullPolicy("Always").withCommand(cmd).editOrNewResources() + .addToRequests("memory", memQuantity).addToRequests("cpu", cpuQuantity).endResources().build(); + } + + // TODO: SAMZA-2371: add util methods (similar to KubernetesUtils in Spark) describing details about pod status and + // container status, then we can use these methods in logs and exception messages. +} diff --git a/settings.gradle b/settings.gradle index cf4c9be7a3..e3f98d2974 100644 --- a/settings.gradle +++ b/settings.gradle @@ -29,6 +29,7 @@ def scalaModules = [ 'samza-elasticsearch', 'samza-hdfs', 'samza-kafka', + 'samza-kubernetes', 'samza-kv', 'samza-kv-inmemory', 'samza-kv-rocksdb',