Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2067: Support Samza's running on Kubernetes #1197

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,27 @@ project(":samza-yarn_$scalaSuffix") {
jar.dependsOn("lesscss")
}

project(":samza-kubernetes_$scalaSuffix") {
apply plugin: 'java'

dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaSuffix")
compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
compile group: 'io.fabric8', name: 'kubernetes-client', version: kubernetesJavaClientVersion
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
}

tasks.create(name: "releaseKubeTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
into "samza-kubernetes-${version}"
compression = Compression.GZIP
from(configurations.runtime) { into("lib/") }
from(configurations.archives.artifacts.files) { into("lib/") }
duplicatesStrategy 'exclude'
}
}

project(":samza-shell") {
apply plugin: 'java'

Expand Down
1 change: 1 addition & 0 deletions gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,5 @@
jnaVersion = "4.5.1"
couchbaseClientVersion = "2.7.2"
couchbaseMockVersion = "1.5.22"
kubernetesJavaClientVersion = "4.1.3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
hostAffinityEnabled, jobConfig.getStandbyTasksEnabled(), localityManager, faultDomainManager, config);

this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.containerManager);
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
if (shouldStartAllocateThread()) {
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do actually use containerAllocator anymore? Please walk through this part of the code with me offline.

} else {
this.allocatorThread = null;
}
this.restartContainers = restartContainers;
LOG.info("Finished container process manager initialization.");
}
Expand Down Expand Up @@ -211,15 +215,23 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
LOG.info("Finished container process manager initialization");
}

// In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a
// separate thread to keep polling the allocated resources to start the container.
public boolean shouldStartAllocateThread() {
return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lookup of specific cluster manager seems pretty hard to maintain. Can we think of a better to distinguish when we need this to do container allocation?

}

public boolean shouldShutdown() {
LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}",
state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread != null && allocatorThread.isAlive() ? "yes" : "no");

if (exceptionOccurred != null) {
LOG.error("Exception in container process manager", exceptionOccurred);
throw new SamzaException(exceptionOccurred);
}
return jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get() || !allocatorThread.isAlive();

boolean shouldShutdown = jobFailureCriteriaMet || state.completedProcessors.get() == state.processorCount.get();
return allocatorThread == null ? shouldShutdown : (shouldShutdown || !allocatorThread.isAlive());
}

public void start() {
Expand Down Expand Up @@ -278,7 +290,9 @@ public void start() {

// Start container allocator thread
LOG.info("Starting the container allocator thread");
allocatorThread.start();
if (allocatorThread != null) {
allocatorThread.start();
}
LOG.info("Starting the container process manager");
}

Expand All @@ -287,12 +301,14 @@ public void stop() {

// Shutdown allocator thread
containerAllocator.stop();
try {
allocatorThread.join();
LOG.info("Stopped container allocator");
} catch (InterruptedException ie) {
LOG.error("Allocator thread join threw an interrupted exception", ie);
Thread.currentThread().interrupt();
if (allocatorThread != null) {
try {
allocatorThread.join();
LOG.info("Stopped container allocator");
} catch (InterruptedException ie) {
LOG.error("Allocator thread join threw an interrupted exception", ie);
Thread.currentThread().interrupt();
}
}

if (diagnosticsManager.isDefined()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class HttpServer(
}

/**
* Returns the URL for the root of the HTTP server. This method
* Returns the URL for the root of the HTTP server. This URL is generated with host name.
*/
def getUrl = {
if (running) {
Expand All @@ -136,4 +136,17 @@ class HttpServer(
throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
}
}

/**
* Returns the URL for the root of the HTTP server. This URL is generated with host address.
*/
def getIpUrl = {
if (running) {
val runningPort = server.getConnectors()(0).asInstanceOf[NetworkConnector].getLocalPort()

new URL("http://" + Util.getLocalHost.getHostAddress + ":" + runningPort + rootPath)
} else {
throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object HttpUtil extends Logging {
(exception, loop) => {
exception match {
case ioe: IOException => {
warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
error("Error getting response from Job coordinator server. Received IOException: %s. Retrying..." format ioe)
httpConn = getHttpConnection(url, timeout)
}
case e: Exception =>
Expand Down
34 changes: 34 additions & 0 deletions samza-kubernetes/src/docker/dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# samzaJarsFolder includes all the Samza jars (you needs to make sure all the samza jars are there.)
# You can build Samza image by:
# docker build -t dockerHubAccount/samza:versionNumber .
# Then Samza user can use the Samza image as base image to build their application image.
#

FROM ubuntu:latest

RUN apt-get update -y && apt-get upgrade -y && apt-get install -y openjdk-8-jdk

ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64
ENV PATH $PATH:$JAVA_HOME/bin

RUN mkdir -p /opt/samza
WORKDIR /opt/samza/
COPY samzaJarsFolder/ /opt/samza/
26 changes: 26 additions & 0 deletions samza-kubernetes/src/main/java/org/apache/samza/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

# Configurations
- kube.app.image: the image name for the samza app
- kube.app.namespace: the namespace where the samza app runs
- kube.app.pod.mnt.path: the path where the remote volume is mounted into the pod, for both the job coordinator pod and stream processor pod
the volume can be used for storing logs and local states.
- cluster-manager.container.memory.mb: the memory size for the samza stream processor
- cluster-manager.container.cpu.cores: the cpu cores for the samza stream processor
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.config;

/**
* Kubernetes related configurations
*/
public class KubeConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest following the other Config class example to add getters for common configs, like namespace, jcPodName, etc. It's easier to centralize the place where these things are created. The rest of the code should just use these getters normally instead of directly accessing the config vars. It will be even better to make those static vars private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. will update this class.


// the image name of samza
public static final String APP_IMAGE = "kube.app.image";
public static final String DEFAULT_IMAGE = "samza/samza:v0";

// the default working directory
public static final String DEFAULT_DIRECTORY = "/opt/samza/";

// the memory and the cpu cores of container
public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_MEM_SIZE = 1024;
public static final int DEFAULT_CLUSTER_MANAGER_CONTAINER_CPU_CORE_NUM = 1;

// The directory path inside which the log will be stored.
public static final String SAMZA_MOUNT_DIR = "kube.app.pod.mnt.path";
public static final String DEFAULT_SAMZA_MOUNT_DIR = "/tmp/mnt";
public static final String K8S_API_NAMESPACE = "kube.app.namespace";
public static final String STREAM_PROCESSOR_CONTAINER_NAME_PREFIX = "sp";
public static final String JC_CONTAINER_NAME_PREFIX = "jc";
public static final String POD_RESTART_POLICY = "OnFailure";
public static final String JC_POD_NAME_FORMAT = "%s-%s-%s"; // jc-appName-appId
public static final String TASK_POD_NAME_FORMAT = "%s-%s-%s-%s"; // sp-appName-appId-containerId

// Environment variable
public static final String COORDINATOR_POD_NAME = "COORDINATOR_POD_NAME";
public static final String AZURE_REMOTE_VOLUME_ENABLED = "kube.app.volume.azure.file-share.enabled";
public static final String AZURE_SECRET = "kube.app.volume.azure-secret";
public static final String DEFAULT_AZURE_SECRET = "azure-secret";
public static final String AZURE_FILESHARE = "kube.app.volume.azure.file-share";
public static final String DEFAULT_AZURE_FILESHARE = "aksshare";

private Config config;

public KubeConfig(Config config) {
this.config = config;
}

public static KubeConfig validate(Config config) throws ConfigException {
KubeConfig kc = new KubeConfig(config);
kc.validate();
return kc;
}

// TODO: SAMZA-2365: validate KubeConfig before starting the job
private void validate() throws ConfigException {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.job.kubernetes;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A KubeClientFactory returns a default KubernetesClient
*/
public class KubeClientFactory {
private static final Logger LOG = LoggerFactory.getLogger(KubeClientFactory.class);

public static KubernetesClient create() {
ConfigBuilder builder = new ConfigBuilder();
Config config = builder.build();
KubernetesClient client = new DefaultKubernetesClient(config);
LOG.info("Kubernetes client created. ");
return client;
}
}
Loading