From 7d62dc964ac5d87952b5a59d8472db28a5b1d4de Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Thu, 10 Aug 2023 14:58:28 +0800 Subject: [PATCH 01/24] WIP --- fed/_private/fed_actor.py | 102 +++++++++++++++++++++++++++++++------- 1 file changed, 83 insertions(+), 19 deletions(-) diff --git a/fed/_private/fed_actor.py b/fed/_private/fed_actor.py index aa73bfa..2092b3e 100644 --- a/fed/_private/fed_actor.py +++ b/fed/_private/fed_actor.py @@ -39,20 +39,57 @@ def __init__( self._options = options self._actor_handle = None +# # ray.remote() ray public +# # def f: +# # ray.remote(f) == f.remote() +# # ray.remote(A) == A.remote() +# # a.f.remote() == ray.remote(a.f) placehold +# # a.f ---> ray object + +# def __getattr__(m: native_method): +# if m.party() == current_party: +# ray_actor_handle == self._ray_actor_handle +# ray_method = ray_actor_handle.__getattr__(m) +# return ray_method +# else: +# # placehold + def __getattr__(self, method_name: str): # User trying to call .bind() without a bind class method if method_name == "remote" and "remote" not in dir(self._body): raise AttributeError(f".remote() cannot be used again on {type(self)} ") # Raise an error if the method is invalid. - getattr(self._body, method_name) - call_node = FedActorMethod( - self._addresses, - self._party, - self._node_party, - self, - method_name, - ).options(**self._options) - return call_node + m = getattr(self._body, method_name) + + # call_node = FedActorMethod( + # self._addresses, + # self._party, + # self._node_party, + # self, + # method_name, + # ).options(**self._options) + + if self._party == self._node_party: + ray_actor_handle = self._actor_handle # rename to _ray_actor_handle + ray_wrappered_method = ray_actor_handle.__getattr__(m) + return FedActorMethod( + self._addresses, + self._party, + self._node_party, + self, + method_name, + ray_wrappered_method, + ).options(**self._options) + else: + return FedActorMethod( + self._addresses, + self._party, + self._node_party, + self, + method_name, + None, + ).options(**self._options) + def _execute_impl(self, cls_args, cls_kwargs): """Executor of ClassNode by ray.remote() @@ -69,21 +106,27 @@ def _execute_impl(self, cls_args, cls_kwargs): .remote(*cls_args, **cls_kwargs) ) - def _execute_remote_method(self, method_name, options, args, kwargs): + def _execute_remote_method(self, method_name, options, _ray_wrappered_method, args, kwargs): num_returns = 1 if options and 'num_returns' in options: num_returns = options['num_returns'] logger.debug( f"Actor method call: {method_name}, num_returns: {num_returns}" ) - ray_object_ref = self._actor_handle._actor_method_call( - method_name, - args=args, - kwargs=kwargs, - name="", - num_returns=num_returns, - concurrency_group_name="", - ) + + # a.f.remote() -> grpc client_server + # a.f + + ray_object_ref = ray.remote(_ray_wrappered_method) + + # ray_object_ref = self._actor_handle._actor_method_call( + # method_name, + # args=args, + # kwargs=kwargs, + # name="", + # num_returns=num_returns, + # concurrency_group_name="", + # ) return ray_object_ref @@ -95,6 +138,7 @@ def __init__( node_party, fed_actor_handle, method_name, + ray_wrappered_method, ) -> None: self._addresses = addresses self._party = party # Current party @@ -102,6 +146,7 @@ def __init__( self._fed_actor_handle = fed_actor_handle self._method_name = method_name self._options = {} + self._ray_wrappered_method = ray_wrappered_method self._fed_call_holder = FedCallHolder(node_party, self._execute_impl) def remote(self, *args, **kwargs) -> FedObject: @@ -112,7 +157,26 @@ def options(self, **options): self._fed_call_holder.options(**options) return self +# # ray.remote() ray public +# # def f: +# # ray.remote(f) == f.remote() +# # ray.remote(A) == A.remote() +# # a.f.remote() == ray.remote(a.f) placehold +# # a.f ---> ray object + +# def __getattr__(m: native_method): +# if m.party() == current_party: +# ray_actor_handle == self._ray_actor_handle +# ray_method = ray_actor_handle.__getattr__(m) +# return ray_method +# else: +# # placehold + +# a.f.remote() + + def _execute_impl(self, args, kwargs): + return self._fed_actor_handle._execute_remote_method( - self._method_name, self._options, args, kwargs + self._method_name, self._options, self._ray_wrappered_method, args, kwargs ) From 077a7154725c7badfd797ecf57d0aee913ca25ae Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Thu, 10 Aug 2023 16:28:46 +0800 Subject: [PATCH 02/24] WIP --- fed/_private/fed_actor.py | 5 +++-- fed/api.py | 5 +++++ tests/test_fed_get.py | 2 ++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/fed/_private/fed_actor.py b/fed/_private/fed_actor.py index 2092b3e..4df3b29 100644 --- a/fed/_private/fed_actor.py +++ b/fed/_private/fed_actor.py @@ -71,7 +71,7 @@ def __getattr__(self, method_name: str): if self._party == self._node_party: ray_actor_handle = self._actor_handle # rename to _ray_actor_handle - ray_wrappered_method = ray_actor_handle.__getattr__(m) + ray_wrappered_method = ray_actor_handle.__getattribute__(method_name) return FedActorMethod( self._addresses, self._party, @@ -117,7 +117,8 @@ def _execute_remote_method(self, method_name, options, _ray_wrappered_method, ar # a.f.remote() -> grpc client_server # a.f - ray_object_ref = ray.remote(_ray_wrappered_method) + # ray_object_ref = ray.remote(_ray_wrappered_method) + ray_object_ref = _ray_wrappered_method.remote(*args, **kwargs) # ray_object_ref = self._actor_handle._actor_method_call( # method_name, diff --git a/fed/api.py b/fed/api.py index 19950db..893d0ea 100644 --- a/fed/api.py +++ b/fed/api.py @@ -270,11 +270,16 @@ def _execute_impl(self, args, kwargs): ray.remote(self._func_body).options(**self._options).remote(*args, **kwargs) ) +# class A: +# pass + +# self._ray_actor_handle = ray.remote(A).remote() class FedRemoteClass: def __init__(self, func_or_class) -> None: self._party = None self._cls = func_or_class + # self._client_actor_handle_class = ray. self._options = {} def party(self, party: str): diff --git a/tests/test_fed_get.py b/tests/test_fed_get.py index 3aaf71d..8b18b04 100644 --- a/tests/test_fed_get.py +++ b/tests/test_fed_get.py @@ -62,6 +62,8 @@ def run(party): for epoch in range(epochs): w1 = alice_model.train.remote() w2 = bob_model.train.remote() + print(f"==========================") + print(f"==========================") new_weights = mean.party("alice").remote(w1, w2) result = fed.get(new_weights) alice_model.set_weights.remote(new_weights) From 814c0fea7a0b9f77da162739fdddefb0293d7fcd Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Thu, 10 Aug 2023 17:30:03 +0800 Subject: [PATCH 03/24] WIP --- tests/test_fed_get.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_fed_get.py b/tests/test_fed_get.py index 8b18b04..bb9ad85 100644 --- a/tests/test_fed_get.py +++ b/tests/test_fed_get.py @@ -47,7 +47,8 @@ def mean(x, y): def run(party): - compatible_utils.init_ray(address='local') + address = '127.0.0.1:11012' if party == 'alice' else '127.0.0.1:11011' + compatible_utils.init_ray(address=address) addresses = { 'alice': '127.0.0.1:11012', 'bob': '127.0.0.1:11011', From d3410c0f3d1721d24440e494893bc519b05b05ab Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Fri, 11 Aug 2023 16:16:47 +0800 Subject: [PATCH 04/24] Manually test be green. --- fed/_private/fed_actor.py | 18 ++++++++---------- tests/test_fed_get.py | 14 +++++++++----- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/fed/_private/fed_actor.py b/fed/_private/fed_actor.py index 4df3b29..9b60e2c 100644 --- a/fed/_private/fed_actor.py +++ b/fed/_private/fed_actor.py @@ -59,19 +59,17 @@ def __getattr__(self, method_name: str): if method_name == "remote" and "remote" not in dir(self._body): raise AttributeError(f".remote() cannot be used again on {type(self)} ") # Raise an error if the method is invalid. - m = getattr(self._body, method_name) - - # call_node = FedActorMethod( - # self._addresses, - # self._party, - # self._node_party, - # self, - # method_name, - # ).options(**self._options) + getattr(self._body, method_name) if self._party == self._node_party: ray_actor_handle = self._actor_handle # rename to _ray_actor_handle - ray_wrappered_method = ray_actor_handle.__getattribute__(method_name) + try: + ray_wrappered_method = ray_actor_handle.__getattribute__(method_name) + except AttributeError: + # The code path in Ray client mode. + assert isinstance(ray_actor_handle, ray.util.client.common.ClientActorHandle) + ray_wrappered_method = ray_actor_handle.__getattr__(method_name) + return FedActorMethod( self._addresses, self._party, diff --git a/tests/test_fed_get.py b/tests/test_fed_get.py index bb9ad85..a160bf4 100644 --- a/tests/test_fed_get.py +++ b/tests/test_fed_get.py @@ -47,11 +47,17 @@ def mean(x, y): def run(party): - address = '127.0.0.1:11012' if party == 'alice' else '127.0.0.1:11011' + import time + if party == 'alice': + time.sleep(1.4) + + address = 'ray://127.0.0.1:21012' if party == 'alice' else 'ray://127.0.0.1:21011' compatible_utils.init_ray(address=address) + + addresses = { - 'alice': '127.0.0.1:11012', - 'bob': '127.0.0.1:11011', + 'alice': '127.0.0.1:31012', + 'bob': '127.0.0.1:31011', } fed.init(addresses=addresses, party=party) @@ -63,8 +69,6 @@ def run(party): for epoch in range(epochs): w1 = alice_model.train.remote() w2 = bob_model.train.remote() - print(f"==========================") - print(f"==========================") new_weights = mean.party("alice").remote(w1, w2) result = fed.get(new_weights) alice_model.set_weights.remote(new_weights) From 272a5fbb0b704dca6128b0df5dee5f7a05987498 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Fri, 11 Aug 2023 16:22:57 +0800 Subject: [PATCH 05/24] LINT --- fed/_private/fed_actor.py | 53 +++------------------------------------ fed/api.py | 4 --- 2 files changed, 3 insertions(+), 54 deletions(-) diff --git a/fed/_private/fed_actor.py b/fed/_private/fed_actor.py index 9b60e2c..566e184 100644 --- a/fed/_private/fed_actor.py +++ b/fed/_private/fed_actor.py @@ -37,22 +37,7 @@ def __init__( self._party = party self._node_party = node_party self._options = options - self._actor_handle = None - -# # ray.remote() ray public -# # def f: -# # ray.remote(f) == f.remote() -# # ray.remote(A) == A.remote() -# # a.f.remote() == ray.remote(a.f) placehold -# # a.f ---> ray object - -# def __getattr__(m: native_method): -# if m.party() == current_party: -# ray_actor_handle == self._ray_actor_handle -# ray_method = ray_actor_handle.__getattr__(m) -# return ray_method -# else: -# # placehold + self._ray_actor_handle = None def __getattr__(self, method_name: str): # User trying to call .bind() without a bind class method @@ -62,7 +47,7 @@ def __getattr__(self, method_name: str): getattr(self._body, method_name) if self._party == self._node_party: - ray_actor_handle = self._actor_handle # rename to _ray_actor_handle + ray_actor_handle = self._ray_actor_handle try: ray_wrappered_method = ray_actor_handle.__getattribute__(method_name) except AttributeError: @@ -112,21 +97,7 @@ def _execute_remote_method(self, method_name, options, _ray_wrappered_method, ar f"Actor method call: {method_name}, num_returns: {num_returns}" ) - # a.f.remote() -> grpc client_server - # a.f - - # ray_object_ref = ray.remote(_ray_wrappered_method) - ray_object_ref = _ray_wrappered_method.remote(*args, **kwargs) - - # ray_object_ref = self._actor_handle._actor_method_call( - # method_name, - # args=args, - # kwargs=kwargs, - # name="", - # num_returns=num_returns, - # concurrency_group_name="", - # ) - return ray_object_ref + return _ray_wrappered_method.remote(*args, **kwargs) class FedActorMethod: @@ -156,24 +127,6 @@ def options(self, **options): self._fed_call_holder.options(**options) return self -# # ray.remote() ray public -# # def f: -# # ray.remote(f) == f.remote() -# # ray.remote(A) == A.remote() -# # a.f.remote() == ray.remote(a.f) placehold -# # a.f ---> ray object - -# def __getattr__(m: native_method): -# if m.party() == current_party: -# ray_actor_handle == self._ray_actor_handle -# ray_method = ray_actor_handle.__getattr__(m) -# return ray_method -# else: -# # placehold - -# a.f.remote() - - def _execute_impl(self, args, kwargs): return self._fed_actor_handle._execute_remote_method( diff --git a/fed/api.py b/fed/api.py index 893d0ea..14bdaa0 100644 --- a/fed/api.py +++ b/fed/api.py @@ -270,10 +270,6 @@ def _execute_impl(self, args, kwargs): ray.remote(self._func_body).options(**self._options).remote(*args, **kwargs) ) -# class A: -# pass - -# self._ray_actor_handle = ray.remote(A).remote() class FedRemoteClass: def __init__(self, func_or_class) -> None: From 65adee1b2425b68a6cc1cf55b2bcd5c17bb37146 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Fri, 11 Aug 2023 17:29:42 +0800 Subject: [PATCH 06/24] import --- fed/_private/fed_actor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fed/_private/fed_actor.py b/fed/_private/fed_actor.py index 566e184..7fd4b87 100644 --- a/fed/_private/fed_actor.py +++ b/fed/_private/fed_actor.py @@ -15,6 +15,7 @@ import logging import ray +from ray.util.client.common import ClientActorHandle from fed._private.fed_call_holder import FedCallHolder from fed.fed_object import FedObject @@ -52,7 +53,7 @@ def __getattr__(self, method_name: str): ray_wrappered_method = ray_actor_handle.__getattribute__(method_name) except AttributeError: # The code path in Ray client mode. - assert isinstance(ray_actor_handle, ray.util.client.common.ClientActorHandle) + assert isinstance(ray_actor_handle, ClientActorHandle) ray_wrappered_method = ray_actor_handle.__getattr__(method_name) return FedActorMethod( From 37091697eafcccdf7a06b15391bb12da723e532c Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 14 Aug 2023 11:27:37 +0800 Subject: [PATCH 07/24] Fix CI --- fed/_private/fed_actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fed/_private/fed_actor.py b/fed/_private/fed_actor.py index 7fd4b87..61d1b99 100644 --- a/fed/_private/fed_actor.py +++ b/fed/_private/fed_actor.py @@ -84,7 +84,7 @@ def _execute_impl(self, cls_args, cls_kwargs): current node is executed. """ if self._node_party == self._party: - self._actor_handle = ( + self._ray_actor_handle = ( ray.remote(self._body) .options(**self._options) .remote(*cls_args, **cls_kwargs) From ffd8cdbb7f7d5bd3471133d5d59a5e6f4e6ef9b6 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 14 Aug 2023 14:14:27 +0800 Subject: [PATCH 08/24] Workaround --- tests/test_fed_get.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_fed_get.py b/tests/test_fed_get.py index a160bf4..9ec361b 100644 --- a/tests/test_fed_get.py +++ b/tests/test_fed_get.py @@ -52,7 +52,8 @@ def run(party): time.sleep(1.4) address = 'ray://127.0.0.1:21012' if party == 'alice' else 'ray://127.0.0.1:21011' - compatible_utils.init_ray(address=address) + # compatible_utils.init_ray(address=address) + compatible_utils.init_ray(address='local') addresses = { From 09ec2ce0b8e177f0d582f7d7c15730191760fbfe Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 14 Aug 2023 15:43:36 +0800 Subject: [PATCH 09/24] Fix test_options --- fed/_private/fed_actor.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/fed/_private/fed_actor.py b/fed/_private/fed_actor.py index 61d1b99..d4080ec 100644 --- a/fed/_private/fed_actor.py +++ b/fed/_private/fed_actor.py @@ -98,7 +98,13 @@ def _execute_remote_method(self, method_name, options, _ray_wrappered_method, ar f"Actor method call: {method_name}, num_returns: {num_returns}" ) - return _ray_wrappered_method.remote(*args, **kwargs) + return _ray_wrappered_method.options( + name='', + num_returns=num_returns, + ).remote( + *args, + **kwargs, + ) class FedActorMethod: From 78074266c124843321934b4191598c60c5798171 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Mon, 14 Aug 2023 16:32:15 +0800 Subject: [PATCH 10/24] LINT --- fed/_private/fed_actor.py | 11 ++++++++--- tests/test_fed_get.py | 7 +++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/fed/_private/fed_actor.py b/fed/_private/fed_actor.py index d4080ec..dd88057 100644 --- a/fed/_private/fed_actor.py +++ b/fed/_private/fed_actor.py @@ -74,7 +74,6 @@ def __getattr__(self, method_name: str): None, ).options(**self._options) - def _execute_impl(self, cls_args, cls_kwargs): """Executor of ClassNode by ray.remote() @@ -90,7 +89,14 @@ def _execute_impl(self, cls_args, cls_kwargs): .remote(*cls_args, **cls_kwargs) ) - def _execute_remote_method(self, method_name, options, _ray_wrappered_method, args, kwargs): + def _execute_remote_method( + self, + method_name, + options, + _ray_wrappered_method, + args, + kwargs, + ): num_returns = 1 if options and 'num_returns' in options: num_returns = options['num_returns'] @@ -135,7 +141,6 @@ def options(self, **options): return self def _execute_impl(self, args, kwargs): - return self._fed_actor_handle._execute_remote_method( self._method_name, self._options, self._ray_wrappered_method, args, kwargs ) diff --git a/tests/test_fed_get.py b/tests/test_fed_get.py index 9ec361b..5752f77 100644 --- a/tests/test_fed_get.py +++ b/tests/test_fed_get.py @@ -50,12 +50,11 @@ def run(party): import time if party == 'alice': time.sleep(1.4) - - address = 'ray://127.0.0.1:21012' if party == 'alice' else 'ray://127.0.0.1:21011' + + # address = 'ray://127.0.0.1:21012' if party == 'alice' else 'ray://127.0.0.1:21011' # noqa # compatible_utils.init_ray(address=address) compatible_utils.init_ray(address='local') - - + addresses = { 'alice': '127.0.0.1:31012', 'bob': '127.0.0.1:31011', From 4ffb10cf26d525f4e447246433b699cf805985a9 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 15:52:47 +0800 Subject: [PATCH 11/24] Add unit test. --- fed/api.py | 1 - fed/utils.py | 12 ++ .../test_basic_client_mode.py | 111 ++++++++++++++++++ 3 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 tests/client_mode_tests/test_basic_client_mode.py diff --git a/fed/api.py b/fed/api.py index 14bdaa0..19950db 100644 --- a/fed/api.py +++ b/fed/api.py @@ -275,7 +275,6 @@ class FedRemoteClass: def __init__(self, func_or_class) -> None: self._party = None self._cls = func_or_class - # self._client_actor_handle_class = ray. self._options = {} def party(self, party: str): diff --git a/fed/utils.py b/fed/utils.py index 481fcfd..62ca02f 100644 --- a/fed/utils.py +++ b/fed/utils.py @@ -15,6 +15,7 @@ import logging import re import sys +import subprocess import ray @@ -223,3 +224,14 @@ def validate_addresses(addresses: dict): isinstance(address, str) and address ), f'Address should be string but got {address}.' validate_address(address) + + +def start_command(command: str, timeout=60) : + """ + A util to start a shell command. + """ + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, error = process.communicate(timeout=timeout) + if len(error) != 0: + raise RuntimeError(f'Failed to start command [{command}], the error is:\n {error.decode()}') + return output diff --git a/tests/client_mode_tests/test_basic_client_mode.py b/tests/client_mode_tests/test_basic_client_mode.py new file mode 100644 index 0000000..0c7a35f --- /dev/null +++ b/tests/client_mode_tests/test_basic_client_mode.py @@ -0,0 +1,111 @@ +# Copyright 2023 The RayFed Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing + +import pytest +import ray +import fed +import fed._private.compatible_utils as compatible_utils +import fed.utils as fed_utils + + +@fed.remote +class MyModel: + def __init__(self, party, step_length): + self._trained_steps = 0 + self._step_length = step_length + self._weights = 0 + self._party = party + + def train(self): + self._trained_steps += 1 + self._weights += self._step_length + return self._weights + + def get_weights(self): + return self._weights + + def set_weights(self, new_weights): + self._weights = new_weights + return new_weights + + +@fed.remote +def mean(x, y): + return (x + y) / 2 + + +def run(party): + import time + if party == 'alice': + time.sleep(1.4) + + address = 'ray://127.0.0.1:21012' if party == 'alice' else 'ray://127.0.0.1:21011' # noqa + compatible_utils.init_ray(address=address) + # compatible_utils.init_ray(address='local') + + addresses = { + 'alice': '127.0.0.1:31012', + 'bob': '127.0.0.1:31011', + } + fed.init(addresses=addresses, party=party) + + epochs = 3 + alice_model = MyModel.party("alice").remote("alice", 2) + bob_model = MyModel.party("bob").remote("bob", 4) + + all_mean_weights = [] + for epoch in range(epochs): + w1 = alice_model.train.remote() + w2 = bob_model.train.remote() + new_weights = mean.party("alice").remote(w1, w2) + result = fed.get(new_weights) + alice_model.set_weights.remote(new_weights) + bob_model.set_weights.remote(new_weights) + all_mean_weights.append(result) + assert all_mean_weights == [3, 6, 9] + latest_weights = fed.get( + [alice_model.get_weights.remote(), bob_model.get_weights.remote()] + ) + assert latest_weights == [9, 9] + fed.shutdown() + ray.shutdown() + + +def test_fed_get_in_2_parties(): + # Start 2 Ray clusters. + output1 = fed_utils.start_command('ray start --head --port=41012 --ray-client-server-port=21012 --dashboard-port=9112') + import time + time.sleep(5) + try: + output2 = fed_utils.start_command('ray start --head --port=41011 --ray-client-server-port=21011 --dashboard-port=9111') + except RuntimeError as e: + # A successful case. + assert 'Overwriting previous Ray address' in str(e) + + p_alice = multiprocessing.Process(target=run, args=('alice',)) + p_bob = multiprocessing.Process(target=run, args=('bob',)) + p_alice.start() + p_bob.start() + p_alice.join() + p_bob.join() + assert p_alice.exitcode == 0 and p_bob.exitcode == 0 + # TODO(qwang): This should be added to fixture to let it be cleaned when failing. + fed_utils.start_command('ray stop --force') + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-sv", __file__])) From d046f278c2bbe032242fe27b1a6084674e4c189f Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 16:51:05 +0800 Subject: [PATCH 12/24] 1 --- .../test_basic_client_mode.py | 16 ++------ tests/test_utils.py | 37 +++++++++++++++++++ 2 files changed, 40 insertions(+), 13 deletions(-) create mode 100644 tests/test_utils.py diff --git a/tests/client_mode_tests/test_basic_client_mode.py b/tests/client_mode_tests/test_basic_client_mode.py index 0c7a35f..62d177a 100644 --- a/tests/client_mode_tests/test_basic_client_mode.py +++ b/tests/client_mode_tests/test_basic_client_mode.py @@ -18,7 +18,7 @@ import ray import fed import fed._private.compatible_utils as compatible_utils -import fed.utils as fed_utils +import fed.tests.test_utils as fed_test_utils @fed.remote @@ -54,7 +54,6 @@ def run(party): address = 'ray://127.0.0.1:21012' if party == 'alice' else 'ray://127.0.0.1:21011' # noqa compatible_utils.init_ray(address=address) - # compatible_utils.init_ray(address='local') addresses = { 'alice': '127.0.0.1:31012', @@ -84,16 +83,8 @@ def run(party): ray.shutdown() +@pytest.fixture(fed_test_utils.ray_client_mode_setup) def test_fed_get_in_2_parties(): - # Start 2 Ray clusters. - output1 = fed_utils.start_command('ray start --head --port=41012 --ray-client-server-port=21012 --dashboard-port=9112') - import time - time.sleep(5) - try: - output2 = fed_utils.start_command('ray start --head --port=41011 --ray-client-server-port=21011 --dashboard-port=9111') - except RuntimeError as e: - # A successful case. - assert 'Overwriting previous Ray address' in str(e) p_alice = multiprocessing.Process(target=run, args=('alice',)) p_bob = multiprocessing.Process(target=run, args=('bob',)) @@ -102,8 +93,7 @@ def test_fed_get_in_2_parties(): p_alice.join() p_bob.join() assert p_alice.exitcode == 0 and p_bob.exitcode == 0 - # TODO(qwang): This should be added to fixture to let it be cleaned when failing. - fed_utils.start_command('ray stop --force') + if __name__ == "__main__": import sys diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..be70402 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,37 @@ +import pytest +import time + +import fed.utils as fed_utils + + +def start_ray_cluster( + ray_port, + client_server_port, + dashboard_port, + ): + command = [ + 'ray', + 'start', + '--head', + f'--port={ray_port}', + f'--ray-client-server-port={client_server_port}', + f'--dashboard-port={dashboard_port}', + ] + command_str = str.join(command) + _ = fed_utils.start_command(command_str) + + +@pytest.fixture +def ray_client_mode_setup(): + # Start 2 Ray clusters. + start_ray_cluster(ray_port=41012, client_server_port=21012, dashboard_port=9112) + + time.sleep(1) + try: + start_ray_cluster(ray_port=41011, client_server_port=21011, dashboard_port=9111) + except RuntimeError as e: + # A successful case. + assert 'Overwriting previous Ray address' in str(e) + + yield + fed_utils.start_command('ray stop --force') From 0e88acbc6ea484f5bbcc9b46635305d2a47c55ac Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 16:52:29 +0800 Subject: [PATCH 13/24] Move tests --- {tests => fed/tests}/__init__.py | 0 {tests => fed/tests}/client_mode_tests/test_basic_client_mode.py | 0 .../tests}/serializations_tests/test_unpickle_with_whitelist.py | 0 {tests => fed/tests}/simple_example.py | 0 {tests => fed/tests}/test_api.py | 0 {tests => fed/tests}/test_async_startup_2_clusters.py | 0 {tests => fed/tests}/test_basic_pass_fed_objects.py | 0 {tests => fed/tests}/test_cache_fed_objects.py | 0 {tests => fed/tests}/test_enable_tls_across_parties.py | 0 {tests => fed/tests}/test_exit_on_failure_sending.py | 0 {tests => fed/tests}/test_fed_get.py | 0 {tests => fed/tests}/test_grpc_options_on_proxies.py | 0 {tests => fed/tests}/test_internal_kv.py | 0 {tests => fed/tests}/test_listening_address.py | 0 {tests => fed/tests}/test_options.py | 0 .../tests}/test_pass_fed_objects_in_containers_in_actor.py | 0 .../tests}/test_pass_fed_objects_in_containers_in_normal_tasks.py | 0 {tests => fed/tests}/test_ping_others.py | 0 {tests => fed/tests}/test_repeat_init.py | 0 {tests => fed/tests}/test_reset_context.py | 0 {tests => fed/tests}/test_retry_policy.py | 0 {tests => fed/tests}/test_setup_proxy_actor.py | 0 {tests => fed/tests}/test_transport_proxy.py | 0 {tests => fed/tests}/test_transport_proxy_tls.py | 0 {tests => fed/tests}/test_utils.py | 0 {tests => fed/tests}/without_ray_tests/test_tree_utils.py | 0 {tests => fed/tests}/without_ray_tests/test_utils.py | 0 27 files changed, 0 insertions(+), 0 deletions(-) rename {tests => fed/tests}/__init__.py (100%) rename {tests => fed/tests}/client_mode_tests/test_basic_client_mode.py (100%) rename {tests => fed/tests}/serializations_tests/test_unpickle_with_whitelist.py (100%) rename {tests => fed/tests}/simple_example.py (100%) rename {tests => fed/tests}/test_api.py (100%) rename {tests => fed/tests}/test_async_startup_2_clusters.py (100%) rename {tests => fed/tests}/test_basic_pass_fed_objects.py (100%) rename {tests => fed/tests}/test_cache_fed_objects.py (100%) rename {tests => fed/tests}/test_enable_tls_across_parties.py (100%) rename {tests => fed/tests}/test_exit_on_failure_sending.py (100%) rename {tests => fed/tests}/test_fed_get.py (100%) rename {tests => fed/tests}/test_grpc_options_on_proxies.py (100%) rename {tests => fed/tests}/test_internal_kv.py (100%) rename {tests => fed/tests}/test_listening_address.py (100%) rename {tests => fed/tests}/test_options.py (100%) rename {tests => fed/tests}/test_pass_fed_objects_in_containers_in_actor.py (100%) rename {tests => fed/tests}/test_pass_fed_objects_in_containers_in_normal_tasks.py (100%) rename {tests => fed/tests}/test_ping_others.py (100%) rename {tests => fed/tests}/test_repeat_init.py (100%) rename {tests => fed/tests}/test_reset_context.py (100%) rename {tests => fed/tests}/test_retry_policy.py (100%) rename {tests => fed/tests}/test_setup_proxy_actor.py (100%) rename {tests => fed/tests}/test_transport_proxy.py (100%) rename {tests => fed/tests}/test_transport_proxy_tls.py (100%) rename {tests => fed/tests}/test_utils.py (100%) rename {tests => fed/tests}/without_ray_tests/test_tree_utils.py (100%) rename {tests => fed/tests}/without_ray_tests/test_utils.py (100%) diff --git a/tests/__init__.py b/fed/tests/__init__.py similarity index 100% rename from tests/__init__.py rename to fed/tests/__init__.py diff --git a/tests/client_mode_tests/test_basic_client_mode.py b/fed/tests/client_mode_tests/test_basic_client_mode.py similarity index 100% rename from tests/client_mode_tests/test_basic_client_mode.py rename to fed/tests/client_mode_tests/test_basic_client_mode.py diff --git a/tests/serializations_tests/test_unpickle_with_whitelist.py b/fed/tests/serializations_tests/test_unpickle_with_whitelist.py similarity index 100% rename from tests/serializations_tests/test_unpickle_with_whitelist.py rename to fed/tests/serializations_tests/test_unpickle_with_whitelist.py diff --git a/tests/simple_example.py b/fed/tests/simple_example.py similarity index 100% rename from tests/simple_example.py rename to fed/tests/simple_example.py diff --git a/tests/test_api.py b/fed/tests/test_api.py similarity index 100% rename from tests/test_api.py rename to fed/tests/test_api.py diff --git a/tests/test_async_startup_2_clusters.py b/fed/tests/test_async_startup_2_clusters.py similarity index 100% rename from tests/test_async_startup_2_clusters.py rename to fed/tests/test_async_startup_2_clusters.py diff --git a/tests/test_basic_pass_fed_objects.py b/fed/tests/test_basic_pass_fed_objects.py similarity index 100% rename from tests/test_basic_pass_fed_objects.py rename to fed/tests/test_basic_pass_fed_objects.py diff --git a/tests/test_cache_fed_objects.py b/fed/tests/test_cache_fed_objects.py similarity index 100% rename from tests/test_cache_fed_objects.py rename to fed/tests/test_cache_fed_objects.py diff --git a/tests/test_enable_tls_across_parties.py b/fed/tests/test_enable_tls_across_parties.py similarity index 100% rename from tests/test_enable_tls_across_parties.py rename to fed/tests/test_enable_tls_across_parties.py diff --git a/tests/test_exit_on_failure_sending.py b/fed/tests/test_exit_on_failure_sending.py similarity index 100% rename from tests/test_exit_on_failure_sending.py rename to fed/tests/test_exit_on_failure_sending.py diff --git a/tests/test_fed_get.py b/fed/tests/test_fed_get.py similarity index 100% rename from tests/test_fed_get.py rename to fed/tests/test_fed_get.py diff --git a/tests/test_grpc_options_on_proxies.py b/fed/tests/test_grpc_options_on_proxies.py similarity index 100% rename from tests/test_grpc_options_on_proxies.py rename to fed/tests/test_grpc_options_on_proxies.py diff --git a/tests/test_internal_kv.py b/fed/tests/test_internal_kv.py similarity index 100% rename from tests/test_internal_kv.py rename to fed/tests/test_internal_kv.py diff --git a/tests/test_listening_address.py b/fed/tests/test_listening_address.py similarity index 100% rename from tests/test_listening_address.py rename to fed/tests/test_listening_address.py diff --git a/tests/test_options.py b/fed/tests/test_options.py similarity index 100% rename from tests/test_options.py rename to fed/tests/test_options.py diff --git a/tests/test_pass_fed_objects_in_containers_in_actor.py b/fed/tests/test_pass_fed_objects_in_containers_in_actor.py similarity index 100% rename from tests/test_pass_fed_objects_in_containers_in_actor.py rename to fed/tests/test_pass_fed_objects_in_containers_in_actor.py diff --git a/tests/test_pass_fed_objects_in_containers_in_normal_tasks.py b/fed/tests/test_pass_fed_objects_in_containers_in_normal_tasks.py similarity index 100% rename from tests/test_pass_fed_objects_in_containers_in_normal_tasks.py rename to fed/tests/test_pass_fed_objects_in_containers_in_normal_tasks.py diff --git a/tests/test_ping_others.py b/fed/tests/test_ping_others.py similarity index 100% rename from tests/test_ping_others.py rename to fed/tests/test_ping_others.py diff --git a/tests/test_repeat_init.py b/fed/tests/test_repeat_init.py similarity index 100% rename from tests/test_repeat_init.py rename to fed/tests/test_repeat_init.py diff --git a/tests/test_reset_context.py b/fed/tests/test_reset_context.py similarity index 100% rename from tests/test_reset_context.py rename to fed/tests/test_reset_context.py diff --git a/tests/test_retry_policy.py b/fed/tests/test_retry_policy.py similarity index 100% rename from tests/test_retry_policy.py rename to fed/tests/test_retry_policy.py diff --git a/tests/test_setup_proxy_actor.py b/fed/tests/test_setup_proxy_actor.py similarity index 100% rename from tests/test_setup_proxy_actor.py rename to fed/tests/test_setup_proxy_actor.py diff --git a/tests/test_transport_proxy.py b/fed/tests/test_transport_proxy.py similarity index 100% rename from tests/test_transport_proxy.py rename to fed/tests/test_transport_proxy.py diff --git a/tests/test_transport_proxy_tls.py b/fed/tests/test_transport_proxy_tls.py similarity index 100% rename from tests/test_transport_proxy_tls.py rename to fed/tests/test_transport_proxy_tls.py diff --git a/tests/test_utils.py b/fed/tests/test_utils.py similarity index 100% rename from tests/test_utils.py rename to fed/tests/test_utils.py diff --git a/tests/without_ray_tests/test_tree_utils.py b/fed/tests/without_ray_tests/test_tree_utils.py similarity index 100% rename from tests/without_ray_tests/test_tree_utils.py rename to fed/tests/without_ray_tests/test_tree_utils.py diff --git a/tests/without_ray_tests/test_utils.py b/fed/tests/without_ray_tests/test_utils.py similarity index 100% rename from tests/without_ray_tests/test_utils.py rename to fed/tests/without_ray_tests/test_utils.py From 3e8ff3a34bdd3a81309fae27cbff81fadddd1a0d Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 16:58:54 +0800 Subject: [PATCH 14/24] Fix lint. --- fed/tests/test_transport_proxy.py | 1 - fed/tests/test_utils.py | 2 +- fed/utils.py | 9 +++++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/fed/tests/test_transport_proxy.py b/fed/tests/test_transport_proxy.py index 9ebb787..55a9b56 100644 --- a/fed/tests/test_transport_proxy.py +++ b/fed/tests/test_transport_proxy.py @@ -22,7 +22,6 @@ import fed._private.compatible_utils as compatible_utils import fed.utils as fed_utils from fed._private import constants, global_context -from fed.config import CrossSiloMessageConfig, GrpcCrossSiloMessageConfig from fed.proxy.barriers import ( _start_receiver_proxy, _start_sender_proxy, diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index be70402..934c8bc 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -8,7 +8,7 @@ def start_ray_cluster( ray_port, client_server_port, dashboard_port, - ): +): command = [ 'ray', 'start', diff --git a/fed/utils.py b/fed/utils.py index efc4c59..b5450f2 100644 --- a/fed/utils.py +++ b/fed/utils.py @@ -243,8 +243,13 @@ def start_command(command: str, timeout=60) : """ A util to start a shell command. """ - process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) output, error = process.communicate(timeout=timeout) if len(error) != 0: - raise RuntimeError(f'Failed to start command [{command}], the error is:\n {error.decode()}') + raise RuntimeError( + f'Failed to start command [{command}], the error is:\n {error.decode()}') return output From 065e506cfaf1bda7107dea24bd88b3411a6b45d5 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 17:04:47 +0800 Subject: [PATCH 15/24] Fix unit tests. --- test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.sh b/test.sh index 7199064..d816149 100755 --- a/test.sh +++ b/test.sh @@ -10,7 +10,7 @@ export RAY_TLS_SERVER_CERT="/tmp/rayfed/test-certs/server.crt" export RAY_TLS_SERVER_KEY="/tmp/rayfed/test-certs/server.key" export RAY_TLS_CA_CERT="/tmp/rayfed/test-certs/server.crt" -cd tests +cd fed/tests python3 -m pytest -v -s test_* python3 -m pytest -v -s serializations_tests/test_* python3 -m pytest -v -s without_ray_tests/test_* From 8e6b8532fd70690804f21baa385f9d033c85fe85 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 17:06:39 +0800 Subject: [PATCH 16/24] Fix client mode tests. --- test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/test.sh b/test.sh index d816149..312b53e 100755 --- a/test.sh +++ b/test.sh @@ -14,6 +14,7 @@ cd fed/tests python3 -m pytest -v -s test_* python3 -m pytest -v -s serializations_tests/test_* python3 -m pytest -v -s without_ray_tests/test_* +python3 -m pytest -v -s client_mode_tests/test_* cd - echo "All tests finished." From 355aab3ba637213ff90fd4e97e43f9f5f5f21e50 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 17:24:35 +0800 Subject: [PATCH 17/24] 1 --- fed/tests/test_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index 934c8bc..16eacb8 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -21,7 +21,6 @@ def start_ray_cluster( _ = fed_utils.start_command(command_str) -@pytest.fixture def ray_client_mode_setup(): # Start 2 Ray clusters. start_ray_cluster(ray_port=41012, client_server_port=21012, dashboard_port=9112) From 7a3b37efdd539ab78da105455ce66edc90edd5c5 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 17:26:40 +0800 Subject: [PATCH 18/24] 1 --- fed/tests/client_mode_tests/test_basic_client_mode.py | 6 +++--- fed/tests/test_utils.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/fed/tests/client_mode_tests/test_basic_client_mode.py b/fed/tests/client_mode_tests/test_basic_client_mode.py index 62d177a..121066c 100644 --- a/fed/tests/client_mode_tests/test_basic_client_mode.py +++ b/fed/tests/client_mode_tests/test_basic_client_mode.py @@ -18,7 +18,7 @@ import ray import fed import fed._private.compatible_utils as compatible_utils -import fed.tests.test_utils as fed_test_utils +from fed.tests.test_utils import ray_client_mode_setup @fed.remote @@ -83,8 +83,8 @@ def run(party): ray.shutdown() -@pytest.fixture(fed_test_utils.ray_client_mode_setup) -def test_fed_get_in_2_parties(): + +def test_fed_get_in_2_parties(ray_client_mode_setup): p_alice = multiprocessing.Process(target=run, args=('alice',)) p_bob = multiprocessing.Process(target=run, args=('bob',)) diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index 16eacb8..934c8bc 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -21,6 +21,7 @@ def start_ray_cluster( _ = fed_utils.start_command(command_str) +@pytest.fixture def ray_client_mode_setup(): # Start 2 Ray clusters. start_ray_cluster(ray_port=41012, client_server_port=21012, dashboard_port=9112) From 0ea5d40cf9f52421dfef9ffc6d178414bdd2454e Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 17:34:58 +0800 Subject: [PATCH 19/24] Fix2 --- fed/tests/client_mode_tests/test_basic_client_mode.py | 5 ++--- fed/tests/test_utils.py | 2 -- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/fed/tests/client_mode_tests/test_basic_client_mode.py b/fed/tests/client_mode_tests/test_basic_client_mode.py index 121066c..d4c0afa 100644 --- a/fed/tests/client_mode_tests/test_basic_client_mode.py +++ b/fed/tests/client_mode_tests/test_basic_client_mode.py @@ -83,9 +83,8 @@ def run(party): ray.shutdown() - -def test_fed_get_in_2_parties(ray_client_mode_setup): - +@pytest.fixture(ray_client_mode_setup) +def test_fed_get_in_2_parties(): p_alice = multiprocessing.Process(target=run, args=('alice',)) p_bob = multiprocessing.Process(target=run, args=('bob',)) p_alice.start() diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index 934c8bc..01b8f7b 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -1,4 +1,3 @@ -import pytest import time import fed.utils as fed_utils @@ -21,7 +20,6 @@ def start_ray_cluster( _ = fed_utils.start_command(command_str) -@pytest.fixture def ray_client_mode_setup(): # Start 2 Ray clusters. start_ray_cluster(ray_port=41012, client_server_port=21012, dashboard_port=9112) From 80c08daf89bef9a13687b686669ddcace1fbca39 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 18:00:39 +0800 Subject: [PATCH 20/24] Fix client mode test. --- fed/tests/client_mode_tests/test_basic_client_mode.py | 5 ++--- fed/tests/test_utils.py | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/fed/tests/client_mode_tests/test_basic_client_mode.py b/fed/tests/client_mode_tests/test_basic_client_mode.py index d4c0afa..9807802 100644 --- a/fed/tests/client_mode_tests/test_basic_client_mode.py +++ b/fed/tests/client_mode_tests/test_basic_client_mode.py @@ -18,7 +18,7 @@ import ray import fed import fed._private.compatible_utils as compatible_utils -from fed.tests.test_utils import ray_client_mode_setup +from fed.tests.test_utils import ray_client_mode_setup # noqa @fed.remote @@ -83,8 +83,7 @@ def run(party): ray.shutdown() -@pytest.fixture(ray_client_mode_setup) -def test_fed_get_in_2_parties(): +def test_fed_get_in_2_parties(ray_client_mode_setup): # noqa p_alice = multiprocessing.Process(target=run, args=('alice',)) p_bob = multiprocessing.Process(target=run, args=('bob',)) p_alice.start() diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index 01b8f7b..596a03e 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -1,4 +1,5 @@ import time +import pytest import fed.utils as fed_utils @@ -16,10 +17,11 @@ def start_ray_cluster( f'--ray-client-server-port={client_server_port}', f'--dashboard-port={dashboard_port}', ] - command_str = str.join(command) + command_str = ' '.join(command) _ = fed_utils.start_command(command_str) +@pytest.fixture def ray_client_mode_setup(): # Start 2 Ray clusters. start_ray_cluster(ray_port=41012, client_server_port=21012, dashboard_port=9112) From 97b1adc8d129f1a73c90103f7cf1ab09f5f6c0ad Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 18:02:26 +0800 Subject: [PATCH 21/24] Add lincense header. --- fed/tests/test_utils.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index 596a03e..35ceced 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -1,3 +1,17 @@ +# Copyright 2023 The RayFed Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import time import pytest From 25c3b29327fb79156b2ed22a8292482cd8410fbd Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 20:55:04 +0800 Subject: [PATCH 22/24] Fix in CI env --- fed/tests/test_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index 35ceced..fa66c3c 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -45,7 +45,8 @@ def ray_client_mode_setup(): start_ray_cluster(ray_port=41011, client_server_port=21011, dashboard_port=9111) except RuntimeError as e: # A successful case. - assert 'Overwriting previous Ray address' in str(e) + assert 'Overwriting previous Ray address' in str(e) \ + or 'WARNING: The object store is using /tmp instead of /dev/shm' in str(e) yield fed_utils.start_command('ray stop --force') From 5b018b1d6438d8c6d398b9b97f3809b3d0478c00 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 21:14:06 +0800 Subject: [PATCH 23/24] Fix ci --- fed/tests/test_utils.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index fa66c3c..7ce014e 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -44,9 +44,18 @@ def ray_client_mode_setup(): try: start_ray_cluster(ray_port=41011, client_server_port=21011, dashboard_port=9111) except RuntimeError as e: - # A successful case. + # As we should treat the following warning messages is ok to use. + # E RuntimeError: Failed to start command [ray start --head --port=41012 + # --ray-client-server-port=21012 --dashboard-port=9112], the error is: + # E 2023-09-13 13:04:11,520 WARNING services.py:1882 -- WARNING: The + # object store is using /tmp instead of /dev/shm because /dev/shm has only 67108864 + # bytes available. This will harm performance! You may be able to free up space by + # deleting files in /dev/shm. If you are inside a Docker container, you can increase + # /dev/shm size by passing '--shm-size=1.97gb' to 'docker run' (or add it to the + # run_options list in a Ray cluster config). Make sure to set this to more than + # 0% of available RAM. assert 'Overwriting previous Ray address' in str(e) \ - or 'WARNING: The object store is using /tmp instead of /dev/shm' in str(e) + or 'The object store is using' in str(e) yield fed_utils.start_command('ray stop --force') From 7c5f9f46795af4e2270cbcc7d39c1465cb9cfd34 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Wed, 13 Sep 2023 21:27:33 +0800 Subject: [PATCH 24/24] Fix CI in linux. --- fed/tests/test_utils.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index 7ce014e..f17f1a6 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -32,30 +32,29 @@ def start_ray_cluster( f'--dashboard-port={dashboard_port}', ] command_str = ' '.join(command) - _ = fed_utils.start_command(command_str) + try: + _ = fed_utils.start_command(command_str) + except RuntimeError as e: + # As we should treat the following warning messages is ok to use. + # E RuntimeError: Failed to start command [ray start --head --port=41012 + # --ray-client-server-port=21012 --dashboard-port=9112], the error is: + # E 2023-09-13 13:04:11,520 WARNING services.py:1882 -- WARNING: The + # object store is using /tmp instead of /dev/shm because /dev/shm has only + # 67108864 bytes available. This will harm performance! You may be able to + # free up space by deleting files in /dev/shm. If you are inside a Docker + # container, you can increase /dev/shm size by passing '--shm-size=1.97gb' to + # 'docker run' (or add it to the run_options list in a Ray cluster config). + # Make sure to set this to more than 0% of available RAM. + assert 'Overwriting previous Ray address' in str(e) \ + or 'WARNING: The object store is using /tmp instead of /dev/shm' in str(e) @pytest.fixture def ray_client_mode_setup(): # Start 2 Ray clusters. start_ray_cluster(ray_port=41012, client_server_port=21012, dashboard_port=9112) - time.sleep(1) - try: - start_ray_cluster(ray_port=41011, client_server_port=21011, dashboard_port=9111) - except RuntimeError as e: - # As we should treat the following warning messages is ok to use. - # E RuntimeError: Failed to start command [ray start --head --port=41012 - # --ray-client-server-port=21012 --dashboard-port=9112], the error is: - # E 2023-09-13 13:04:11,520 WARNING services.py:1882 -- WARNING: The - # object store is using /tmp instead of /dev/shm because /dev/shm has only 67108864 - # bytes available. This will harm performance! You may be able to free up space by - # deleting files in /dev/shm. If you are inside a Docker container, you can increase - # /dev/shm size by passing '--shm-size=1.97gb' to 'docker run' (or add it to the - # run_options list in a Ray cluster config). Make sure to set this to more than - # 0% of available RAM. - assert 'Overwriting previous Ray address' in str(e) \ - or 'The object store is using' in str(e) + start_ray_cluster(ray_port=41011, client_server_port=21011, dashboard_port=9111) yield fed_utils.start_command('ray stop --force')