-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Multi-Job][2/N] Make proxy's RPC framework replaceable #140
Changes from 23 commits
c8dd31f
ef842a9
ccf7d51
a046d27
aa397a8
863da5f
40cec65
ed18a07
51e4e70
8ff5658
582f523
cdc1996
b5a3f08
a432d31
7ffe367
2f9ce57
dec6153
ae6599c
cd08616
e2de5d6
2ab6483
aebf107
0621cf9
8818c8f
922cd5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,7 +63,8 @@ def find_class(self, module, name): | |
def _apply_loads_function_with_whitelist(): | ||
global _pickle_whitelist | ||
|
||
_pickle_whitelist = fed_config.get_cluster_config().serializing_allowed_list | ||
_pickle_whitelist = fed_config.get_job_config() \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed, we should rename the function to |
||
.cross_silo_comm_config.serializing_allowed_list | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be job config? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that we separate Unless there's a global actor (or service job) that can break the job isolation and filter each job's tasks' invalid param type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't have the cluster config, why we not just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's discuss in #156 |
||
if _pickle_whitelist is None: | ||
return | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -15,7 +15,7 @@ | |||||
import functools | ||||||
import inspect | ||||||
import logging | ||||||
from typing import Any, Dict, List, Union | ||||||
from typing import Any, Dict, List, Union, Optional | ||||||
|
||||||
import cloudpickle | ||||||
import ray | ||||||
|
@@ -34,6 +34,8 @@ | |||||
start_recv_proxy, | ||||||
start_send_proxy, | ||||||
) | ||||||
from fed.proxy.grpc.grpc_proxy import SendProxy, RecvProxy | ||||||
from fed.config import CrossSiloMsgConfig | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
from fed.fed_object import FedObject | ||||||
from fed.utils import is_ray_object_refs, setup_logger | ||||||
|
||||||
|
@@ -45,16 +47,10 @@ def init( | |||||
party: str = None, | ||||||
tls_config: Dict = None, | ||||||
logging_level: str = 'info', | ||||||
cross_silo_grpc_retry_policy: Dict = None, | ||||||
cross_silo_send_max_retries: int = None, | ||||||
cross_silo_serializing_allowed_list: Dict = None, | ||||||
cross_silo_send_resource_label: Dict = None, | ||||||
cross_silo_recv_resource_label: Dict = None, | ||||||
exit_on_failure_cross_silo_sending: bool = False, | ||||||
cross_silo_messages_max_size_in_bytes: int = None, | ||||||
cross_silo_timeout_in_seconds: int = 60, | ||||||
enable_waiting_for_other_parties_ready: bool = False, | ||||||
grpc_metadata: Dict = None, | ||||||
send_proxy_cls: SendProxy = None, | ||||||
recv_proxy_cls: RecvProxy = None, | ||||||
global_cross_silo_msg_config: Optional[CrossSiloMsgConfig] = None, | ||||||
**kwargs, | ||||||
): | ||||||
""" | ||||||
|
@@ -71,20 +67,15 @@ def init( | |||||
# (Optional) the listen address, the `address` will be | ||||||
# used if not provided. | ||||||
'listen_addr': '0.0.0.0:10001', | ||||||
# (Optional) The party specific metadata sent with the grpc request | ||||||
'grpc_metadata': (('token', 'alice-token'),), | ||||||
'grpc_options': [ | ||||||
('grpc.default_authority', 'alice'), | ||||||
('grpc.max_send_message_length', 50 * 1024 * 1024) | ||||||
] | ||||||
'cross_silo_comm_config': CrossSiloMsgConfig | ||||||
}, | ||||||
'bob': { | ||||||
# The address for other parties. | ||||||
'address': '127.0.0.1:10002', | ||||||
# (Optional) the listen address, the `address` will be | ||||||
# used if not provided. | ||||||
'listen_addr': '0.0.0.0:10002', | ||||||
# (Optional) The party specific metadata sent with the grpc request | ||||||
# (Optional) The party specific metadata sent with grpc requests | ||||||
'grpc_metadata': (('token', 'bob-token'),), | ||||||
}, | ||||||
'carol': { | ||||||
|
@@ -93,7 +84,7 @@ def init( | |||||
# (Optional) the listen address, the `address` will be | ||||||
# used if not provided. | ||||||
'listen_addr': '0.0.0.0:10003', | ||||||
# (Optional) The party specific metadata sent with the grpc request | ||||||
# (Optional) The party specific metadata sent with grpc requests | ||||||
'grpc_metadata': (('token', 'carol-token'),), | ||||||
}, | ||||||
} | ||||||
|
@@ -116,48 +107,13 @@ def init( | |||||
"cert": "bob's server cert", | ||||||
"key": "bob's server cert key", | ||||||
} | ||||||
|
||||||
logging_level: optional; the logging level, could be `debug`, `info`, | ||||||
`warning`, `error`, `critical`, not case sensititive. | ||||||
cross_silo_grpc_retry_policy: a dict descibes the retry policy for | ||||||
cross silo rpc call. If None, the following default retry policy | ||||||
will be used. More details please refer to | ||||||
`retry-policy <https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy>`_. # noqa | ||||||
|
||||||
.. code:: python | ||||||
{ | ||||||
"maxAttempts": 4, | ||||||
"initialBackoff": "0.1s", | ||||||
"maxBackoff": "1s", | ||||||
"backoffMultiplier": 2, | ||||||
"retryableStatusCodes": [ | ||||||
"UNAVAILABLE" | ||||||
] | ||||||
} | ||||||
cross_silo_send_max_retries: the max retries for sending data cross silo. | ||||||
cross_silo_serializing_allowed_list: The package or class list allowed for | ||||||
serializing(deserializating) cross silos. It's used for avoiding pickle | ||||||
deserializing execution attack when crossing solis. | ||||||
cross_silo_send_resource_label: Customized resource label, the SendProxyActor | ||||||
will be scheduled based on the declared resource label. For example, | ||||||
when setting to `{"my_label": 1}`, then the SendProxyActor will be started | ||||||
only on Nodes with `{"resource": {"my_label": $NUM}}` where $NUM >= 1. | ||||||
cross_silo_recv_resource_label: Customized resource label, the RecverProxyActor | ||||||
will be scheduled based on the declared resource label. For example, | ||||||
when setting to `{"my_label": 1}`, then the RecverProxyActor will be started | ||||||
only on Nodes with `{"resource": {"my_label": $NUM}}` where $NUM >= 1. | ||||||
exit_on_failure_cross_silo_sending: whether exit when failure on | ||||||
cross-silo sending. If True, a SIGTERM will be signaled to self | ||||||
if failed to sending cross-silo data. | ||||||
cross_silo_messages_max_size_in_bytes: The maximum length in bytes of | ||||||
cross-silo messages. | ||||||
If None, the default value of 500 MB is specified. | ||||||
cross_silo_timeout_in_seconds: The timeout in seconds of a cross-silo RPC call. | ||||||
It's 60 by default. | ||||||
enable_waiting_for_other_parties_ready: ping other parties until they | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not match the code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not quite understand. |
||||||
are all ready if True. | ||||||
grpc_metadata: optional; The metadata sent with the grpc request. This won't override | ||||||
basic tcp headers, such as `user-agent`, but aggregate them together. | ||||||
global_cross_silo_msg_config: Global cross-silo message related | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for that I still don't get what the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the |
||||||
configs that are applied to all connections. Supported configs | ||||||
can refer to CrossSiloMsgConfig in config.py. | ||||||
|
||||||
Examples: | ||||||
>>> import fed | ||||||
|
@@ -182,22 +138,21 @@ def init( | |||||
assert ( | ||||||
'cert' in tls_config and 'key' in tls_config | ||||||
), 'Cert or key are not in tls_config.' | ||||||
|
||||||
global_cross_silo_msg_config = \ | ||||||
global_cross_silo_msg_config or CrossSiloMsgConfig() | ||||||
# A Ray private accessing, should be replaced in public API. | ||||||
compatible_utils._init_internal_kv() | ||||||
|
||||||
cluster_config = { | ||||||
constants.KEY_OF_CLUSTER_ADDRESSES: cluster, | ||||||
constants.KEY_OF_CURRENT_PARTY_NAME: party, | ||||||
constants.KEY_OF_TLS_CONFIG: tls_config, | ||||||
constants.KEY_OF_CROSS_SILO_SERIALIZING_ALLOWED_LIST: | ||||||
cross_silo_serializing_allowed_list, | ||||||
constants.KEY_OF_CROSS_SILO_MESSAGES_MAX_SIZE_IN_BYTES: | ||||||
cross_silo_messages_max_size_in_bytes, | ||||||
constants.KEY_OF_CROSS_SILO_TIMEOUT_IN_SECONDS: cross_silo_timeout_in_seconds, | ||||||
} | ||||||
|
||||||
job_config = { | ||||||
constants.KEY_OF_GRPC_METADATA : grpc_metadata, | ||||||
constants.KEY_OF_CROSS_SILO_MSG_CONFIG: | ||||||
global_cross_silo_msg_config, | ||||||
} | ||||||
compatible_utils.kv.put(constants.KEY_OF_CLUSTER_CONFIG, | ||||||
cloudpickle.dumps(cluster_config)) | ||||||
|
@@ -215,30 +170,35 @@ def init( | |||||
|
||||||
logger.info(f'Started rayfed with {cluster_config}') | ||||||
get_global_context().get_cleanup_manager().start( | ||||||
exit_when_failure_sending=exit_on_failure_cross_silo_sending) | ||||||
exit_when_failure_sending=global_cross_silo_msg_config.exit_on_sending_failure) | ||||||
|
||||||
recv_actor_config = fed_config.ProxyActorConfig( | ||||||
resource_label=cross_silo_recv_resource_label) | ||||||
if recv_proxy_cls is None: | ||||||
logger.debug( | ||||||
"Not declaring recver proxy class, using `GrpcRecvProxy` as default.") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
more native? |
||||||
from fed.proxy.grpc.grpc_proxy import GrpcRecvProxy | ||||||
recv_proxy_cls = GrpcRecvProxy | ||||||
# Start recv proxy | ||||||
start_recv_proxy( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not related to this PR changes, but |
||||||
cluster=cluster, | ||||||
party=party, | ||||||
logging_level=logging_level, | ||||||
tls_config=tls_config, | ||||||
retry_policy=cross_silo_grpc_retry_policy, | ||||||
actor_config=recv_actor_config | ||||||
proxy_cls=recv_proxy_cls, | ||||||
proxy_config=global_cross_silo_msg_config | ||||||
) | ||||||
|
||||||
send_actor_config = fed_config.ProxyActorConfig( | ||||||
resource_label=cross_silo_send_resource_label) | ||||||
if send_proxy_cls is None: | ||||||
logger.debug( | ||||||
"Not declaring send proxy class, using `GrpcSendProxy` as default.") | ||||||
from fed.proxy.grpc.grpc_proxy import GrpcSendProxy | ||||||
send_proxy_cls = GrpcSendProxy | ||||||
start_send_proxy( | ||||||
cluster=cluster, | ||||||
party=party, | ||||||
logging_level=logging_level, | ||||||
tls_config=tls_config, | ||||||
retry_policy=cross_silo_grpc_retry_policy, | ||||||
max_retries=cross_silo_send_max_retries, | ||||||
actor_config=send_actor_config | ||||||
proxy_cls=send_proxy_cls, | ||||||
proxy_config=global_cross_silo_msg_config | ||||||
) | ||||||
|
||||||
if enable_waiting_for_other_parties_ready: | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -7,7 +7,10 @@ | |||||
import fed._private.compatible_utils as compatible_utils | ||||||
import fed._private.constants as fed_constants | ||||||
import cloudpickle | ||||||
from typing import Dict, Optional | ||||||
import json | ||||||
|
||||||
from typing import Dict, List, Optional | ||||||
from dataclasses import dataclass | ||||||
|
||||||
|
||||||
class ClusterConfig: | ||||||
|
@@ -27,18 +30,6 @@ def current_party(self): | |||||
def tls_config(self): | ||||||
return self._data[fed_constants.KEY_OF_TLS_CONFIG] | ||||||
|
||||||
@property | ||||||
def serializing_allowed_list(self): | ||||||
return self._data[fed_constants.KEY_OF_CROSS_SILO_SERIALIZING_ALLOWED_LIST] | ||||||
|
||||||
@property | ||||||
def cross_silo_timeout(self): | ||||||
return self._data[fed_constants.KEY_OF_CROSS_SILO_TIMEOUT_IN_SECONDS] | ||||||
|
||||||
@property | ||||||
def cross_silo_messages_max_size(self): | ||||||
return self._data[fed_constants.KEY_OF_CROSS_SILO_MESSAGES_MAX_SIZE_IN_BYTES] | ||||||
|
||||||
|
||||||
class JobConfig: | ||||||
def __init__(self, raw_bytes: bytes) -> None: | ||||||
|
@@ -48,8 +39,10 @@ def __init__(self, raw_bytes: bytes) -> None: | |||||
self._data = cloudpickle.loads(raw_bytes) | ||||||
|
||||||
@property | ||||||
def grpc_metadata(self): | ||||||
return self._data.get(fed_constants.KEY_OF_GRPC_METADATA, {}) | ||||||
def cross_silo_comm_config(self): | ||||||
NKcqx marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
return self._data.get( | ||||||
fed_constants.KEY_OF_CROSS_SILO_MSG_CONFIG, | ||||||
CrossSiloMsgConfig()) | ||||||
|
||||||
|
||||||
# A module level cache for the cluster configurations. | ||||||
|
@@ -80,14 +73,94 @@ def get_job_config(): | |||||
return _job_config | ||||||
|
||||||
|
||||||
class ProxyActorConfig: | ||||||
NKcqx marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
@dataclass | ||||||
class CrossSiloMsgConfig: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
"""A class to store parameters used for Proxy Actor | ||||||
|
||||||
Attributes: | ||||||
resource_label: The customized resources for the actor. This will be | ||||||
filled into the "resource" field of Ray ActorClass.options. | ||||||
proxy_max_restarts: The max restart times for the send proxy. | ||||||
serializing_allowed_list: The package or class list allowed for | ||||||
serializing(deserializating) cross silos. It's used for avoiding pickle | ||||||
deserializing execution attack when crossing solis. | ||||||
send_resource_label: Customized resource label, the SendProxyActor | ||||||
will be scheduled based on the declared resource label. For example, | ||||||
when setting to `{"my_label": 1}`, then the SendProxyActor will be started | ||||||
only on Nodes with `{"resource": {"my_label": $NUM}}` where $NUM >= 1. | ||||||
recv_resource_label: Customized resource label, the RecverProxyActor | ||||||
will be scheduled based on the declared resource label. For example, | ||||||
when setting to `{"my_label": 1}`, then the RecverProxyActor will be started | ||||||
only on Nodes with `{"resource": {"my_label": $NUM}}` where $NUM >= 1. | ||||||
exit_on_sending_failure: whether exit when failure on | ||||||
cross-silo sending. If True, a SIGTERM will be signaled to self | ||||||
if failed to sending cross-silo data. | ||||||
messages_max_size_in_bytes: The maximum length in bytes of | ||||||
NKcqx marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
cross-silo messages. | ||||||
If None, the default value of 500 MB is specified. | ||||||
timeout_in_ms: The timeout in mili-seconds of a cross-silo RPC call. | ||||||
It's 60000 by default. | ||||||
http_header: The HTTP header, e.g. metadata in grpc, sent with the RPC request. | ||||||
This won't override basic tcp headers, such as `user-agent`, but concat | ||||||
them together. | ||||||
""" | ||||||
proxy_max_restarts: int = None | ||||||
timeout_in_ms: int = 60000 | ||||||
messages_max_size_in_bytes: int = None | ||||||
exit_on_sending_failure: Optional[bool] = False | ||||||
serializing_allowed_list: Optional[Dict[str, str]] = None | ||||||
send_resource_label: Optional[Dict[str, str]] = None | ||||||
recv_resource_label: Optional[Dict[str, str]] = None | ||||||
http_header: Optional[Dict[str, str]] = None | ||||||
|
||||||
def __json__(self): | ||||||
return json.dumps(self.__dict__) | ||||||
|
||||||
@classmethod | ||||||
def from_json(cls, json_str): | ||||||
data = json.loads(json_str) | ||||||
return cls(**data) | ||||||
|
||||||
@classmethod | ||||||
def from_dict(cls, data: Dict): | ||||||
"""Initialize CrossSiloMsgConfig from a dictionary. | ||||||
|
||||||
Args: | ||||||
data (Dict): Dictionary with keys as member variable names. | ||||||
|
||||||
Returns: | ||||||
CrossSiloMsgConfig: An instance of CrossSiloMsgConfig. | ||||||
""" | ||||||
# Get the attributes of the class | ||||||
attrs = {attr for attr, _ in cls.__annotations__.items()} | ||||||
# Filter the dictionary to only include keys that are attributes of the class | ||||||
filtered_data = {key: value for key, value in data.items() if key in attrs} | ||||||
return cls(**filtered_data) | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class GrpcCrossSiloMsgConfig(CrossSiloMsgConfig): | ||||||
"""A class to store parameters used for GRPC communication | ||||||
|
||||||
Attributes: | ||||||
grpc_retry_policy: a dict descibes the retry policy for | ||||||
cross silo rpc call. If None, the following default retry policy | ||||||
will be used. More details please refer to | ||||||
`retry-policy <https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy>`_. # noqa | ||||||
|
||||||
.. code:: python | ||||||
{ | ||||||
"maxAttempts": 4, | ||||||
"initialBackoff": "0.1s", | ||||||
"maxBackoff": "1s", | ||||||
"backoffMultiplier": 2, | ||||||
"retryableStatusCodes": [ | ||||||
"UNAVAILABLE" | ||||||
] | ||||||
} | ||||||
grpc_channel_options: A list of tuples to store GRPC channel options, | ||||||
e.g. [ | ||||||
('grpc.enable_retries', 1), | ||||||
('grpc.max_send_message_length', 50 * 1024 * 1024) | ||||||
] | ||||||
""" | ||||||
def __init__( | ||||||
self, | ||||||
resource_label: Optional[Dict[str, str]] = None) -> None: | ||||||
self.resource_label = resource_label | ||||||
grpc_channel_options: List = None | ||||||
grpc_retry_policy: Dict[str, str] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.