From bbc4bc2c45024f72ee02037427a5f0192ed70ef3 Mon Sep 17 00:00:00 2001 From: zhouaihui Date: Thu, 18 Jan 2024 11:26:32 +0800 Subject: [PATCH] feat: unittest on more ray versions. (#194) --- .../unit_tests_for_protobuf_matrix.yml | 2 +- .../workflows/unit_tests_on_ray_matrix.yml | 9 +- README.md | 4 +- benchmarks/many_tiny_tasks_benchmark.py | 2 +- dev-requirements.txt | 5 +- fed/tests/client_mode_tests/__init__.py | 0 .../test_basic_client_mode.py | 9 ++ fed/tests/multi-jobs/__init__.py | 0 .../multi-jobs/test_ignore_other_job_msg.py | 118 ++++++------------ .../multi-jobs/test_multi_proxy_actor.py | 96 +++----------- fed/tests/serializations_tests/__init__.py | 0 .../test_unpickle_with_whitelist.py | 6 +- fed/tests/simple_example.py | 2 +- fed/tests/test_exit_on_failure_sending.py | 4 +- fed/tests/test_internal_kv.py | 4 +- fed/tests/test_utils.py | 1 + fed/tests/without_ray_tests/__init__.py | 0 requirements.txt | 1 + test.sh | 10 +- 19 files changed, 87 insertions(+), 186 deletions(-) create mode 100644 fed/tests/client_mode_tests/__init__.py create mode 100644 fed/tests/multi-jobs/__init__.py create mode 100644 fed/tests/serializations_tests/__init__.py create mode 100644 fed/tests/without_ray_tests/__init__.py diff --git a/.github/workflows/unit_tests_for_protobuf_matrix.yml b/.github/workflows/unit_tests_for_protobuf_matrix.yml index 3d9ae4c7..97074003 100644 --- a/.github/workflows/unit_tests_for_protobuf_matrix.yml +++ b/.github/workflows/unit_tests_for_protobuf_matrix.yml @@ -31,7 +31,7 @@ jobs: python3 -m virtualenv -p python3 py3 . py3/bin/activate which python - pip install pytest torch cloudpickle cryptography + pip install pytest torch cloudpickle cryptography numpy pip install protobuf==${{ matrix.protobuf_ver }} pip install ray==2.4.0 diff --git a/.github/workflows/unit_tests_on_ray_matrix.yml b/.github/workflows/unit_tests_on_ray_matrix.yml index 20eb9a50..6c54124c 100644 --- a/.github/workflows/unit_tests_on_ray_matrix.yml +++ b/.github/workflows/unit_tests_on_ray_matrix.yml @@ -11,7 +11,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - ray_version: [2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0] + ray_version: [2.4.0, 2.5.1, 2.6.3, 2.7.1, 2.8.1, 2.9.0] timeout-minutes: 60 runs-on: ubuntu-latest @@ -32,10 +32,11 @@ jobs: python3 -m virtualenv -p python3 py3 . py3/bin/activate which python - pip install pytest torch cloudpickle cryptography - # six is required in ray-2.1.0 - pip install "protobuf<4.0" six + pip install pytest + pip install -r dev-requirements.txt pip install ray==${{ matrix.ray_version }} + grep -ivE "ray" requirements.txt > temp_requirement.txt + pip install -r temp_requirement.txt - name: Build and test run: | diff --git a/README.md b/README.md index 86bf867b..804e2863 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ The above codes: ### Step 4: Declare Cross-party Cluster & Init ```python def main(party): - ray.init(address='local') + ray.init(address='local', include_dashboard=False) addresses = { 'alice': '127.0.0.1:11012', @@ -143,7 +143,7 @@ def aggregate(val1, val2): def main(party): - ray.init(address='local') + ray.init(address='local', include_dashboard=False) addresses = { 'alice': '127.0.0.1:11012', diff --git a/benchmarks/many_tiny_tasks_benchmark.py b/benchmarks/many_tiny_tasks_benchmark.py index ae450041..654dce4b 100644 --- a/benchmarks/many_tiny_tasks_benchmark.py +++ b/benchmarks/many_tiny_tasks_benchmark.py @@ -33,7 +33,7 @@ def aggr(self, val1, val2): def main(party): - ray.init(address='local') + ray.init(address='local', include_dashboard=False) addresses = { 'alice': '127.0.0.1:11010', diff --git a/dev-requirements.txt b/dev-requirements.txt index 71f7ae14..14c21c06 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,2 @@ -click -pandas -tensorflow>=2 -scikit-learn cryptography +numpy \ No newline at end of file diff --git a/fed/tests/client_mode_tests/__init__.py b/fed/tests/client_mode_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fed/tests/client_mode_tests/test_basic_client_mode.py b/fed/tests/client_mode_tests/test_basic_client_mode.py index 35f3d851..04cf6576 100644 --- a/fed/tests/client_mode_tests/test_basic_client_mode.py +++ b/fed/tests/client_mode_tests/test_basic_client_mode.py @@ -19,8 +19,17 @@ import fed import fed._private.compatible_utils as compatible_utils +from fed._private.compatible_utils import _compare_version_strings from fed.tests.test_utils import ray_client_mode_setup # noqa +pytestmark = pytest.mark.skipif( + _compare_version_strings( + ray.__version__, + '2.4.0', + ), + reason='Skip client mode when ray > 2.4.0', +) + @fed.remote class MyModel: diff --git a/fed/tests/multi-jobs/__init__.py b/fed/tests/multi-jobs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fed/tests/multi-jobs/test_ignore_other_job_msg.py b/fed/tests/multi-jobs/test_ignore_other_job_msg.py index d83acff3..a02f06e7 100644 --- a/fed/tests/multi-jobs/test_ignore_other_job_msg.py +++ b/fed/tests/multi-jobs/test_ignore_other_job_msg.py @@ -12,110 +12,68 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing +import cloudpickle import grpc import pytest import ray +import multiprocessing -import fed import fed._private.compatible_utils as compatible_utils import fed.utils as fed_utils -from fed.proxy.grpc.grpc_proxy import GrpcSenderProxy, send_data_grpc +from fed.proxy.barriers import ReceiverProxyActor +from fed.proxy.grpc.grpc_proxy import GrpcReceiverProxy if compatible_utils._compare_version_strings( fed_utils.get_package_version('protobuf'), '4.0.0' ): + from fed.grpc.pb4 import fed_pb2 as fed_pb2 from fed.grpc.pb4 import fed_pb2_grpc as fed_pb2_grpc else: + from fed.grpc.pb3 import fed_pb2 as fed_pb2 from fed.grpc.pb3 import fed_pb2_grpc as fed_pb2_grpc -class TestGrpcSenderProxy(GrpcSenderProxy): - async def send(self, dest_party, data, upstream_seq_id, downstream_seq_id): - dest_addr = self._addresses[dest_party] - grpc_metadata, grpc_channel_options = self.get_grpc_config_by_party(dest_party) - if dest_party not in self._stubs: - channel = grpc.aio.insecure_channel(dest_addr, options=grpc_channel_options) - stub = fed_pb2_grpc.GrpcServiceStub(channel) - self._stubs[dest_party] = stub - - timeout = self._proxy_config.timeout_in_ms / 1000 - response: str = await send_data_grpc( - data=data, - stub=self._stubs[dest_party], - upstream_seq_id=upstream_seq_id, - downstream_seq_id=downstream_seq_id, - job_name=self._job_name, - timeout=timeout, - metadata=grpc_metadata, - ) - assert response.code == 417 - assert "JobName mis-match" in response.result - # So that process can exit - raise RuntimeError(response.result) - - -@fed.remote -class MyActor: - def __init__(self, party, data): - self.__data = data - self._party = party - - def f(self): - return f"f({self._party}, ip is {ray.util.get_node_ip_address()})" - - -@fed.remote -def agg_fn(obj1, obj2): - return f"agg-{obj1}-{obj2}" - - -addresses = { - 'alice': '127.0.0.1:11012', - 'bob': '127.0.0.1:11011', -} - - -def run(party, job_name): - ray.init(address='local') - fed.init( - addresses=addresses, - party=party, - job_name=job_name, - sender_proxy_cls=TestGrpcSenderProxy, - config={ - 'cross_silo_comm': { - 'exit_on_sending_failure': True, - } - }, +def run(): + # GIVEN + ray.init(address='local', include_dashboard=False) + address = '127.0.0.1:15111' + receiver_proxy_actor = ReceiverProxyActor.remote( + listening_address=address, + party='alice', + job_name='job1', + logging_level='info', + proxy_cls=GrpcReceiverProxy, ) - # 'bob' only needs to start the proxy actors - if party == 'alice': - ds1, ds2 = [123, 789] - actor_alice = MyActor.party("alice").remote(party, ds1) - actor_bob = MyActor.party("bob").remote(party, ds2) - - obj_alice_f = actor_alice.f.remote() - obj_bob_f = actor_bob.f.remote() + receiver_proxy_actor.start.remote() + server_state = ray.get(receiver_proxy_actor.is_ready.remote(), timeout=60) + assert server_state[0], server_state[1] + + # WHEN + channel = grpc.insecure_channel(address) + stub = fed_pb2_grpc.GrpcServiceStub(channel) + + data = cloudpickle.dumps('data') + request = fed_pb2.SendDataRequest( + data=data, + upstream_seq_id=str(1), + downstream_seq_id=str(2), + job_name='job2', + ) + response = stub.SendData(request) - obj = agg_fn.party("bob").remote(obj_alice_f, obj_bob_f) - fed.get(obj) - fed.shutdown() - ray.shutdown() - import time + # THEN + assert response.code == 417 + assert "JobName mis-match" in response.result - # Wait for SIGTERM as failure on sending. - time.sleep(86400) + ray.shutdown() def test_ignore_other_job_msg(): - p_alice = multiprocessing.Process(target=run, args=('alice', 'job1')) - p_bob = multiprocessing.Process(target=run, args=('bob', 'job2')) + p_alice = multiprocessing.Process(target=run) p_alice.start() - p_bob.start() p_alice.join() - p_bob.join() + assert p_alice.exitcode == 0 if __name__ == "__main__": diff --git a/fed/tests/multi-jobs/test_multi_proxy_actor.py b/fed/tests/multi-jobs/test_multi_proxy_actor.py index a540086c..287b34e7 100644 --- a/fed/tests/multi-jobs/test_multi_proxy_actor.py +++ b/fed/tests/multi-jobs/test_multi_proxy_actor.py @@ -12,84 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing -import grpc +import multiprocessing import pytest import ray import fed -import fed._private.compatible_utils as compatible_utils -import fed.utils as fed_utils -from fed.proxy.grpc.grpc_proxy import GrpcSenderProxy, send_data_grpc - -if compatible_utils._compare_version_strings( - fed_utils.get_package_version('protobuf'), '4.0.0' -): - from fed.grpc.pb4 import fed_pb2_grpc as fed_pb2_grpc -else: - from fed.grpc.pb3 import fed_pb2_grpc as fed_pb2_grpc - - -class TestGrpcSenderProxy(GrpcSenderProxy): - async def send(self, dest_party, data, upstream_seq_id, downstream_seq_id): - dest_addr = self._addresses[dest_party] - grpc_metadata, grpc_channel_options = self.get_grpc_config_by_party(dest_party) - if dest_party not in self._stubs: - channel = grpc.aio.insecure_channel(dest_addr, options=grpc_channel_options) - stub = fed_pb2_grpc.GrpcServiceStub(channel) - self._stubs[dest_party] = stub - - timeout = self._proxy_config.timeout_in_ms / 1000 - response: str = await send_data_grpc( - data=data, - stub=self._stubs[dest_party], - upstream_seq_id=upstream_seq_id, - downstream_seq_id=downstream_seq_id, - job_name=self._job_name, - timeout=timeout, - metadata=grpc_metadata, - ) - assert response.code == 417 - assert "JobName mis-match" in response.result - # So that process can exit - raise RuntimeError(response.result) - +from fed.proxy.barriers import receiver_proxy_actor_name, sender_proxy_actor_name +from fed.proxy.grpc.grpc_proxy import GrpcSenderProxy -@fed.remote -class MyActor: - def __init__(self, party, data): - self.__data = data - self._party = party - def f(self): - return f"f({self._party}, ip is {ray.util.get_node_ip_address()})" - - -@fed.remote -def agg_fn(obj1, obj2): - return f"agg-{obj1}-{obj2}" - - -addresses = { - 'job1': { - 'alice': '127.0.0.1:11012', - 'bob': '127.0.0.1:11011', - }, - 'job2': { - 'alice': '127.0.0.1:12012', - 'bob': '127.0.0.1:12011', - }, -} - - -def run(party, job_name): - ray.init(address='local') +def run(): + job_name = 'job_test' + ray.init(address='local', include_dashboard=False) fed.init( - addresses=addresses[job_name], - party=party, + addresses={ + 'alice': '127.0.0.1:11012', + }, + party='alice', job_name=job_name, - sender_proxy_cls=TestGrpcSenderProxy, + sender_proxy_cls=GrpcSenderProxy, config={ 'cross_silo_comm': { 'exit_on_sending_failure': True, @@ -99,22 +41,18 @@ def run(party, job_name): }, ) - sender_proxy_actor_name = f"SenderProxyActor_{job_name}" - receiver_proxy_actor_name = f"ReceiverProxyActor_{job_name}" - assert ray.get_actor(sender_proxy_actor_name) - assert ray.get_actor(receiver_proxy_actor_name) + assert ray.get_actor(sender_proxy_actor_name()) + assert ray.get_actor(receiver_proxy_actor_name()) fed.shutdown() ray.shutdown() def test_multi_proxy_actor(): - p_alice_job1 = multiprocessing.Process(target=run, args=('alice', 'job1')) - p_alice_job2 = multiprocessing.Process(target=run, args=('alice', 'job2')) - p_alice_job1.start() - p_alice_job2.start() - p_alice_job1.join() - p_alice_job2.join() + p_alice = multiprocessing.Process(target=run) + p_alice.start() + p_alice.join() + assert p_alice.exitcode == 0 if __name__ == "__main__": diff --git a/fed/tests/serializations_tests/__init__.py b/fed/tests/serializations_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fed/tests/serializations_tests/test_unpickle_with_whitelist.py b/fed/tests/serializations_tests/test_unpickle_with_whitelist.py index 43e3c3ff..61f7fe21 100644 --- a/fed/tests/serializations_tests/test_unpickle_with_whitelist.py +++ b/fed/tests/serializations_tests/test_unpickle_with_whitelist.py @@ -41,10 +41,10 @@ def pass_arg(d): def run(party): - compatible_utils.init_ray(address='local') + compatible_utils.init_ray(address='local', include_dashboard=False) addresses = { - 'alice': '127.0.0.1:11012', - 'bob': '127.0.0.1:11011', + 'alice': '127.0.0.1:11355', + 'bob': '127.0.0.1:11356', } allowed_list = { "numpy.core.numeric": ["*"], diff --git a/fed/tests/simple_example.py b/fed/tests/simple_example.py index 88c9ce00..8cf4710c 100644 --- a/fed/tests/simple_example.py +++ b/fed/tests/simple_example.py @@ -51,7 +51,7 @@ def agg_fn(obj1, obj2): def run(party): - ray.init(address='local') + ray.init(address='local', include_dashboard=False) fed.init(addresses=addresses, party=party) print(f"Running the script in party {party}") diff --git a/fed/tests/test_exit_on_failure_sending.py b/fed/tests/test_exit_on_failure_sending.py index 8414b486..d630f189 100644 --- a/fed/tests/test_exit_on_failure_sending.py +++ b/fed/tests/test_exit_on_failure_sending.py @@ -38,8 +38,8 @@ def get_value(self): def run(party: str, q: multiprocessing.Queue): compatible_utils.init_ray(address='local') addresses = { - 'alice': '127.0.0.1:11012', - 'bob': '127.0.0.1:11011', + 'alice': '127.0.0.1:21321', + 'bob': '127.0.0.1:21322', } retry_policy = { "maxAttempts": 2, diff --git a/fed/tests/test_internal_kv.py b/fed/tests/test_internal_kv.py index 65a6e4fa..89176a01 100644 --- a/fed/tests/test_internal_kv.py +++ b/fed/tests/test_internal_kv.py @@ -12,8 +12,8 @@ def run(party): compatible_utils.init_ray("local") addresses = { - 'alice': '127.0.0.1:11010', - 'bob': '127.0.0.1:11011', + 'alice': '127.0.0.1:12012', + 'bob': '127.0.0.1:12011', } assert compatible_utils.kv is None fed.init(addresses=addresses, party=party, job_name="test_job_name") diff --git a/fed/tests/test_utils.py b/fed/tests/test_utils.py index d42bde8b..dc03fbec 100644 --- a/fed/tests/test_utils.py +++ b/fed/tests/test_utils.py @@ -30,6 +30,7 @@ def start_ray_cluster( '--head', f'--port={ray_port}', f'--ray-client-server-port={client_server_port}', + f'--include-dashboard=false', f'--dashboard-port={dashboard_port}', ] command_str = ' '.join(command) diff --git a/fed/tests/without_ray_tests/__init__.py b/fed/tests/without_ray_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/requirements.txt b/requirements.txt index e8701bfc..6b50e846 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ ray>=1.13.0 cloudpickle pickle5==0.0.11; python_version < '3.8' protobuf +grpcio diff --git a/test.sh b/test.sh index 740b656c..963db39e 100755 --- a/test.sh +++ b/test.sh @@ -10,12 +10,8 @@ export RAY_TLS_SERVER_CERT="/tmp/rayfed/test-certs/server.crt" export RAY_TLS_SERVER_KEY="/tmp/rayfed/test-certs/server.key" export RAY_TLS_CA_CERT="/tmp/rayfed/test-certs/server.crt" -cd fed/tests -python3 -m pytest -v -s test_* -python3 -m pytest -v -s serializations_tests/test_* -python3 -m pytest -v -s multi-jobs/test_* -python3 -m pytest -v -s without_ray_tests/test_* -python3 -m pytest -v -s client_mode_tests/test_* -cd - +directory="fed/tests" + +find "$directory" -type f -name "test_*.py" -exec pytest -vs {} \; echo "All tests finished."