Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Remove Py4j: Use Ray cross language to start AppMaster #161

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.9.2</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

public class RayAppMasterUtils {
public static ActorHandle<RayAppMaster> createAppMaster(
String cp, List<String> jvmOptions) {
return Ray.actor(RayAppMaster::new, cp).setJvmOptions(jvmOptions).remote();
List<String> jvmOptions) {
return Ray.actor(RayAppMaster::new).setJvmOptions(jvmOptions).remote();
}

public static String getMasterUrl(
Expand All @@ -35,7 +35,6 @@ public static String getMasterUrl(

public static void stopAppMaster(
ActorHandle<RayAppMaster> handle) {
handle.task(RayAppMaster::stop).remote().get();
handle.kill();
handle.task(RayAppMaster::stop).remote();
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,15 @@ import org.apache.spark.rpc._
import org.apache.spark.util.ShutdownHookManager
import org.apache.spark.util.Utils

class RayAppMaster(host: String,
port: Int,
actor_extra_classpath: String) extends Serializable with Logging {
class RayAppMaster() extends Serializable with Logging {
private var endpoint: RpcEndpointRef = _
private var rpcEnv: RpcEnv = _
private val conf: SparkConf = new SparkConf()

init()

def this() = {
this(RayConfig.create().nodeIp, 0, "")
}
private val host: String = RayConfig.create().nodeIp
private var actorExtraClasspath: String = _

def this(actor_extra_classpath: String) = {
this(RayConfig.create().nodeIp, 0, actor_extra_classpath)
}
init()

def init(): Unit = {
Utils.loadDefaultSparkProperties(conf)
Expand All @@ -59,7 +52,7 @@ class RayAppMaster(host: String,
RayAppMaster.ENV_NAME,
host,
host,
port,
0,
conf,
securityMgr,
numUsableCores = 0,
Expand All @@ -68,6 +61,11 @@ class RayAppMaster(host: String,
endpoint = rpcEnv.setupEndpoint(RayAppMaster.ENDPOINT_NAME, new RayAppMasterEndpoint(rpcEnv))
}

def setActorClasspath(cp: String): Int = {
actorExtraClasspath = cp
0
}

/**
* Get the app master endpoint URL. The executor will connect to AppMaster by this URL and
* tell the AppMaster that it has started up successful.
Expand All @@ -84,14 +82,14 @@ class RayAppMaster(host: String,
url.replace("spark", "ray")
}

def stop(): Int = {
def stop(): Unit = {
logInfo("Stopping RayAppMaster")
if (rpcEnv != null) {
rpcEnv.shutdown()
endpoint = null
rpcEnv = null
}
0
Ray.exitActor()
}

class RayAppMasterEndpoint(override val rpcEnv: RpcEnv)
Expand Down Expand Up @@ -244,10 +242,10 @@ class RayAppMaster(host: String,
s"Found ${javaOpts(i - 1)} while not classpath url in executor java opts")
}

javaOpts.updated(i, javaOpts(i) + File.pathSeparator + actor_extra_classpath)
javaOpts.updated(i, javaOpts(i) + File.pathSeparator + actorExtraClasspath)
} else {
// user has not set, we append the actor extra classpath in the end
javaOpts ++ Seq("-cp", actor_extra_classpath)
javaOpts ++ Seq("-cp", actorExtraClasspath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class RayCoarseGrainedSchedulerBackend(
Ray.init()
val cp = sys.props("java.class.path")
val options = RayExternalShuffleService.getShuffleConf(conf)
masterHandle = RayAppMasterUtils.createAppMaster(cp, options.toBuffer.asJava)
masterHandle = RayAppMasterUtils.createAppMaster(options.toBuffer.asJava)
uri = new URI(RayAppMasterUtils.getMasterUrl(masterHandle))
} else {
uri = new URI(sparkUrl)
Expand Down
4 changes: 2 additions & 2 deletions python/raydp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

from raydp.context import init_spark, stop_spark

from raydp.utils import get_code_search_path
__version__ = "0.4.0.dev0"

__all__ = ["init_spark", "stop_spark"]
__all__ = ["init_spark", "stop_spark", "get_code_search_path"]
10 changes: 5 additions & 5 deletions python/raydp/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
# limitations under the License.
#

import atexit
import atexit, os
from contextlib import ContextDecorator
from threading import RLock
from typing import Dict, Union, Optional

import ray
from ray.job_config import JobConfig
import pyspark
from pyspark.sql import SparkSession

from raydp.spark import SparkCluster
from raydp.utils import parse_memory_size

from raydp.utils import get_code_search_path, parse_memory_size

class _SparkContext(ContextDecorator):
"""A class used to create the Spark cluster and get the Spark session.
Expand Down Expand Up @@ -111,8 +112,7 @@ def init_spark(app_name: str,

if not ray.is_initialized():
# ray has not initialized, init local
ray.init()

ray.init(job_config=JobConfig(java_code_search_path=get_code_search_path()))
with _spark_context_lock:
global _global_spark_context
if _global_spark_context is None:
Expand Down
26 changes: 13 additions & 13 deletions python/raydp/spark/ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,34 @@
# limitations under the License.
#

import glob
import glob, os
from typing import Any, Dict

import ray
from pyspark.sql.session import SparkSession

from raydp.services import Cluster
from .ray_cluster_master import RayClusterMaster, RAYDP_CP
from .ray_cluster_master import RayClusterMaster, RAYDP_JARS


class SparkCluster(Cluster):
def __init__(self, configs):
super().__init__(None)
self._app_master_bridge = None
self._app_master = None
self._configs = configs
self._set_up_master(None, None)
self._spark_session: SparkSession = None

def _set_up_master(self, resources: Dict[str, float], kwargs: Dict[Any, Any]):
# TODO: specify the app master resource
self._app_master_bridge = RayClusterMaster(self._configs)
self._app_master_bridge.start_up()
self._app_master = RayClusterMaster(self._configs)
self._app_master.start_up()

def _set_up_worker(self, resources: Dict[str, float], kwargs: Dict[str, str]):
raise Exception("Unsupported operation")

def get_cluster_url(self) -> str:
return self._app_master_bridge.get_master_url()
return self._app_master.get_master_url()

def get_spark_session(self,
app_name: str,
Expand All @@ -58,19 +58,19 @@ def get_spark_session(self,
extra_conf["spark.executor.instances"] = str(num_executors)
extra_conf["spark.executor.cores"] = str(executor_cores)
extra_conf["spark.executor.memory"] = str(executor_memory)
driver_node_ip = ray.services.get_node_ip_address()
driver_node_ip = ray.util.get_node_ip_address()
extra_conf["spark.driver.host"] = str(driver_node_ip)
extra_conf["spark.driver.bindAddress"] = str(driver_node_ip)
try:
extra_jars = [extra_conf["spark.jars"]]
except KeyError:
extra_jars = []
extra_conf["spark.jars"] = ",".join(glob.glob(RAYDP_CP) + extra_jars)
extra_conf["spark.jars"] = ",".join(glob.glob(RAYDP_JARS) + extra_jars)
driver_cp = "spark.driver.extraClassPath"
if driver_cp in extra_conf:
extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_CP)) + ":" + extra_conf[driver_cp]
extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_JARS)) + ":" + extra_conf[driver_cp]
else:
extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_CP))
extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_JARS))
spark_builder = SparkSession.builder
for k, v in extra_conf.items():
spark_builder.config(k, v)
Expand All @@ -83,6 +83,6 @@ def stop(self):
self._spark_session.stop()
self._spark_session = None

if self._app_master_bridge is not None:
self._app_master_bridge.stop()
self._app_master_bridge = None
if self._app_master is not None:
self._app_master.stop()
self._app_master = None
Loading