Skip to content

Commit

Permalink
Add a containerized mode to the ECM service (apache#5201)
Browse files Browse the repository at this point in the history
* Add a containerized mode to the ECM service, which allows assigning specific IPs and ports for communication with the outside world to particular engines in this mode. For instance, a Spark engine requires at least two ports: spark.driver.port and spark.driver.blockManager.port.

* Supplement the SQL for database modification

* remove code that may cause compilation to fail

* improve the linkis_ddl.sql in configmap-init-sql.yaml

* Add all parameters from SparkConf.getConf to sparkLauncher.

* Add a method addAllConf to SparkConf.

* formatted code

* Fix the missing ticketId field in the linkis_cg_manager_service_instance within the configmap-init-sql.yaml file.

* Fix the missing observe_info field in the linkis_ps_job_history_group_history within the configmap-init-sql.yaml file.

* print lm log

* add debug log

* Add fields(mapping_host, mapping_ports) to the linkis_cg_manager_service_instance for ddl file.

* remove debug action

---------

Co-authored-by: peacewong <peacewong@apache.org>
  • Loading branch information
sjgllgh and peacewong authored Nov 27, 2024
1 parent 9b5a34c commit bca9fe2
Show file tree
Hide file tree
Showing 24 changed files with 500 additions and 5 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ jobs:
# Show port-forward list
bash ./linkis-dist/helm/scripts/remote-proxy.sh list
# Check if the web service is available
curl http://127.0.0.1:8088/indexhtml
curl http://127.0.0.1:8088/
# Execute test by linkis-cli
POD_NAME=`kubectl get pods -n linkis -l app.kubernetes.io/instance=linkis-demo-mg-gateway -o jsonpath='{.items[0].metadata.name}'`
Expand All @@ -182,5 +182,4 @@ jobs:
#kubectl exec -it -n linkis ${POD_NAME} -- bash -c " \
#sh /opt/linkis/bin/linkis-cli -engineType spark-3.2.1 -codeType sql -code 'show databases' "
shell: bash
shell: bash
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ class ServiceInstance {
private var applicationName: String = _
private var instance: String = _
private var registryTimestamp: Long = _
private var mappingPorts: String = _
private var mappingHost: String = _
def setApplicationName(applicationName: String): Unit = this.applicationName = applicationName
def getApplicationName: String = applicationName
def setInstance(instance: String): Unit = this.instance = instance
def getInstance: String = instance

def setMappingPorts(mappingPorts: String): Unit = this.mappingPorts = mappingPorts
def getMappingPorts: String = mappingPorts

def setMappingHost(mappingHost: String): Unit = this.mappingHost = mappingHost
def getMappingHost: String = mappingHost

def setRegistryTimestamp(registryTimestamp: Long): Unit = this.registryTimestamp =
registryTimestamp

Expand Down Expand Up @@ -62,6 +70,18 @@ object ServiceInstance {
serviceInstance
}

def apply(
applicationName: String,
instance: String,
mappingPorts: String,
mappingHost: String
): ServiceInstance = {
val serviceInstance = apply(applicationName, instance)
serviceInstance.setMappingPorts(mappingPorts)
serviceInstance.setMappingHost(mappingHost)
serviceInstance
}

def apply(applicationName: String, instance: String, registryTimestamp: Long): ServiceInstance = {
val serviceInstance = new ServiceInstance
serviceInstance.setApplicationName(applicationName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.containerization.enums;

public enum MappingPortStrategyName {
STATIC("static");
private String name;

MappingPortStrategyName(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public static MappingPortStrategyName toEnum(String name) {
return MappingPortStrategyName.valueOf(name.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.containerization.strategy;

import org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName;

public class MappingPortContext {

public static MappingPortStrategy getInstance(MappingPortStrategyName strategyName) {
switch (strategyName) {
case STATIC:
return new StaticMappingPortStrategy();
default:
return new StaticMappingPortStrategy();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.containerization.strategy;

import java.io.IOException;

public interface MappingPortStrategy {
int availablePort() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.containerization.strategy;

import org.apache.linkis.ecm.core.conf.ContainerizationConf;

import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

public class StaticMappingPortStrategy implements MappingPortStrategy {

private static final AtomicInteger currentIndex = new AtomicInteger(0);

@Override
public int availablePort() throws IOException {
return getNewPort(10);
}

public int getNewPort(int retryNum) throws IOException {
int[] portRange = getPortRange();
if (retryNum == 0) {
throw new IOException(
"No available port in the portRange: "
+ ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE()
.getValue());
}
moveIndex();
int minPort = portRange[0];
int newPort = minPort + currentIndex.get() - 1;
ServerSocket socket = null;
try {
socket = new ServerSocket(newPort);
} catch (Exception e) {
return getNewPort(--retryNum);
} finally {
IOUtils.close(socket);
}
return newPort;
}

private synchronized void moveIndex() {
int poolSize = getPoolSize();
currentIndex.set(currentIndex.get() % poolSize + 1);
}

private int[] getPortRange() {
String portRange =
ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE().getValue();

return Arrays.stream(portRange.split("-")).mapToInt(Integer::parseInt).toArray();
}

private int getPoolSize() {
int[] portRange = getPortRange();
int minPort = portRange[0];
int maxPort = portRange[1];

return maxPort - minPort + 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.conf

import org.apache.linkis.common.conf.CommonVars

object ContainerizationConf {

val ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE =
CommonVars("linkis.engine.containerization.static.port.range", "1-65535")

val ENGINE_CONN_CONTAINERIZATION_ENABLE =
CommonVars("linkis.engine.containerization.enable", false).getValue

val ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST =
CommonVars("linkis.engine.containerization.mapping.host", "")

val ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS =
CommonVars("linkis.engine.containerization.mapping.ports", "")

val ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY =
CommonVars("linkis.engine.containerization.mapping.strategy", "static")

// 引擎类型-需要开启的端口数量
// Engine Type - Number of Ports Required to Be Opened
val ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST =
CommonVars("linkis.engine.containerization.engine.list", "spark-2,")

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ package org.apache.linkis.ecm.core.launch
import org.apache.linkis.common.conf.{CommonVars, Configuration}
import org.apache.linkis.common.exception.ErrorException
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.ecm.core.conf.ContainerizationConf.{
ENGINE_CONN_CONTAINERIZATION_ENABLE,
ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST,
ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST,
ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS,
ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY
}
import org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName
import org.apache.linkis.ecm.core.containerization.strategy.MappingPortContext
import org.apache.linkis.ecm.core.errorcode.LinkisECMErrorCodeSummary._
import org.apache.linkis.ecm.core.exception.ECMCoreException
import org.apache.linkis.ecm.core.utils.PortUtils
Expand All @@ -35,6 +44,7 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.{
}
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment._
import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants._
import org.apache.linkis.manager.label.utils.LabelUtil

import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
Expand All @@ -54,6 +64,9 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {

private var engineConnPort: String = _

private var mappingPorts: String = ""
private var mappingHost: String = _

protected def newProcessEngineConnCommandBuilder(): ProcessEngineCommandBuilder =
new UnixProcessEngineCommandBuilder

Expand Down Expand Up @@ -142,6 +155,10 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {

def getEngineConnPort: String = engineConnPort

def getMappingPorts: String = mappingPorts

def getMappingHost: String = mappingHost

protected def getProcess(): Process = this.process

/**
Expand All @@ -166,6 +183,20 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {
.findAvailPortByRange(GovernanceCommonConf.ENGINE_CONN_PORT_RANGE.getValue)
.toString

val engineType = LabelUtil.getEngineType(request.labels)
var engineMappingPortSize = getEngineMappingPortSize(engineType)
if (ENGINE_CONN_CONTAINERIZATION_ENABLE && engineMappingPortSize > 0) {
val strategyName = ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY.getValue
val mappingPortStrategy =
MappingPortContext.getInstance(MappingPortStrategyName.toEnum(strategyName))

while (engineMappingPortSize > 0) {
mappingPorts += mappingPortStrategy.availablePort() + ","
engineMappingPortSize = engineMappingPortSize - 1
}
mappingHost = ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.getValue
}

var springConf =
Map[String, String]("server.port" -> engineConnPort, "spring.profiles.active" -> "engineconn")
val properties =
Expand All @@ -188,10 +219,24 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {
engineConnConf = engineConnConf ++: request.creationDesc.properties.asScala
.filterNot(_._1.startsWith("spring."))
.toMap

engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS.key -> mappingPorts)
engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.key -> mappingHost)

arguments.addEngineConnConf(engineConnConf)
EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToArgs(arguments.build())
}

def getEngineMappingPortSize(engineType: String): Int = {
val engineList = ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST.getValue
val infoList = engineList.trim
.split(",")
.map(_.split("-"))
.filter(engine => engine(0).equals(engineType))
if (infoList.length > 0) infoList(0)(1).toInt
else 0
}

override def kill(): Unit = {
if (process != null) {
process.destroy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
case pro: ProcessEngineConnLaunch =>
val serviceInstance = ServiceInstance(
GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue,
ECMUtils.getInstanceByPort(pro.getEngineConnPort)
ECMUtils.getInstanceByPort(pro.getEngineConnPort),
pro.getMappingPorts,
pro.getMappingHost
)
conn.setServiceInstance(serviceInstance)
case _ =>
Expand Down
Loading

0 comments on commit bca9fe2

Please sign in to comment.