Skip to content

Commit

Permalink
feat: unittest on more ray versions. (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouaihui committed Jan 18, 2024
1 parent 5a63f1b commit bbc4bc2
Show file tree
Hide file tree
Showing 19 changed files with 87 additions and 186 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests_for_protobuf_matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
python3 -m virtualenv -p python3 py3
. py3/bin/activate
which python
pip install pytest torch cloudpickle cryptography
pip install pytest torch cloudpickle cryptography numpy
pip install protobuf==${{ matrix.protobuf_ver }}
pip install ray==2.4.0
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/unit_tests_on_ray_matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
ray_version: [2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0]
ray_version: [2.4.0, 2.5.1, 2.6.3, 2.7.1, 2.8.1, 2.9.0]

timeout-minutes: 60
runs-on: ubuntu-latest
Expand All @@ -32,10 +32,11 @@ jobs:
python3 -m virtualenv -p python3 py3
. py3/bin/activate
which python
pip install pytest torch cloudpickle cryptography
# six is required in ray-2.1.0
pip install "protobuf<4.0" six
pip install pytest
pip install -r dev-requirements.txt
pip install ray==${{ matrix.ray_version }}
grep -ivE "ray" requirements.txt > temp_requirement.txt
pip install -r temp_requirement.txt
- name: Build and test
run: |
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ The above codes:
### Step 4: Declare Cross-party Cluster & Init
```python
def main(party):
ray.init(address='local')
ray.init(address='local', include_dashboard=False)

addresses = {
'alice': '127.0.0.1:11012',
Expand Down Expand Up @@ -143,7 +143,7 @@ def aggregate(val1, val2):


def main(party):
ray.init(address='local')
ray.init(address='local', include_dashboard=False)

addresses = {
'alice': '127.0.0.1:11012',
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/many_tiny_tasks_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def aggr(self, val1, val2):


def main(party):
ray.init(address='local')
ray.init(address='local', include_dashboard=False)

addresses = {
'alice': '127.0.0.1:11010',
Expand Down
5 changes: 1 addition & 4 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
click
pandas
tensorflow>=2
scikit-learn
cryptography
numpy
Empty file.
9 changes: 9 additions & 0 deletions fed/tests/client_mode_tests/test_basic_client_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@

import fed
import fed._private.compatible_utils as compatible_utils
from fed._private.compatible_utils import _compare_version_strings
from fed.tests.test_utils import ray_client_mode_setup # noqa

pytestmark = pytest.mark.skipif(
_compare_version_strings(
ray.__version__,
'2.4.0',
),
reason='Skip client mode when ray > 2.4.0',
)


@fed.remote
class MyModel:
Expand Down
Empty file.
118 changes: 38 additions & 80 deletions fed/tests/multi-jobs/test_ignore_other_job_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,110 +12,68 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import multiprocessing

import cloudpickle
import grpc
import pytest
import ray
import multiprocessing

import fed
import fed._private.compatible_utils as compatible_utils
import fed.utils as fed_utils
from fed.proxy.grpc.grpc_proxy import GrpcSenderProxy, send_data_grpc
from fed.proxy.barriers import ReceiverProxyActor
from fed.proxy.grpc.grpc_proxy import GrpcReceiverProxy

if compatible_utils._compare_version_strings(
fed_utils.get_package_version('protobuf'), '4.0.0'
):
from fed.grpc.pb4 import fed_pb2 as fed_pb2
from fed.grpc.pb4 import fed_pb2_grpc as fed_pb2_grpc
else:
from fed.grpc.pb3 import fed_pb2 as fed_pb2
from fed.grpc.pb3 import fed_pb2_grpc as fed_pb2_grpc


class TestGrpcSenderProxy(GrpcSenderProxy):
async def send(self, dest_party, data, upstream_seq_id, downstream_seq_id):
dest_addr = self._addresses[dest_party]
grpc_metadata, grpc_channel_options = self.get_grpc_config_by_party(dest_party)
if dest_party not in self._stubs:
channel = grpc.aio.insecure_channel(dest_addr, options=grpc_channel_options)
stub = fed_pb2_grpc.GrpcServiceStub(channel)
self._stubs[dest_party] = stub

timeout = self._proxy_config.timeout_in_ms / 1000
response: str = await send_data_grpc(
data=data,
stub=self._stubs[dest_party],
upstream_seq_id=upstream_seq_id,
downstream_seq_id=downstream_seq_id,
job_name=self._job_name,
timeout=timeout,
metadata=grpc_metadata,
)
assert response.code == 417
assert "JobName mis-match" in response.result
# So that process can exit
raise RuntimeError(response.result)


@fed.remote
class MyActor:
def __init__(self, party, data):
self.__data = data
self._party = party

def f(self):
return f"f({self._party}, ip is {ray.util.get_node_ip_address()})"


@fed.remote
def agg_fn(obj1, obj2):
return f"agg-{obj1}-{obj2}"


addresses = {
'alice': '127.0.0.1:11012',
'bob': '127.0.0.1:11011',
}


def run(party, job_name):
ray.init(address='local')
fed.init(
addresses=addresses,
party=party,
job_name=job_name,
sender_proxy_cls=TestGrpcSenderProxy,
config={
'cross_silo_comm': {
'exit_on_sending_failure': True,
}
},
def run():
# GIVEN
ray.init(address='local', include_dashboard=False)
address = '127.0.0.1:15111'
receiver_proxy_actor = ReceiverProxyActor.remote(
listening_address=address,
party='alice',
job_name='job1',
logging_level='info',
proxy_cls=GrpcReceiverProxy,
)
# 'bob' only needs to start the proxy actors
if party == 'alice':
ds1, ds2 = [123, 789]
actor_alice = MyActor.party("alice").remote(party, ds1)
actor_bob = MyActor.party("bob").remote(party, ds2)

obj_alice_f = actor_alice.f.remote()
obj_bob_f = actor_bob.f.remote()
receiver_proxy_actor.start.remote()
server_state = ray.get(receiver_proxy_actor.is_ready.remote(), timeout=60)
assert server_state[0], server_state[1]

# WHEN
channel = grpc.insecure_channel(address)
stub = fed_pb2_grpc.GrpcServiceStub(channel)

data = cloudpickle.dumps('data')
request = fed_pb2.SendDataRequest(
data=data,
upstream_seq_id=str(1),
downstream_seq_id=str(2),
job_name='job2',
)
response = stub.SendData(request)

obj = agg_fn.party("bob").remote(obj_alice_f, obj_bob_f)
fed.get(obj)
fed.shutdown()
ray.shutdown()
import time
# THEN
assert response.code == 417
assert "JobName mis-match" in response.result

# Wait for SIGTERM as failure on sending.
time.sleep(86400)
ray.shutdown()


def test_ignore_other_job_msg():
p_alice = multiprocessing.Process(target=run, args=('alice', 'job1'))
p_bob = multiprocessing.Process(target=run, args=('bob', 'job2'))
p_alice = multiprocessing.Process(target=run)
p_alice.start()
p_bob.start()
p_alice.join()
p_bob.join()
assert p_alice.exitcode == 0


if __name__ == "__main__":
Expand Down
96 changes: 17 additions & 79 deletions fed/tests/multi-jobs/test_multi_proxy_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,84 +12,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import multiprocessing

import grpc
import multiprocessing
import pytest
import ray

import fed
import fed._private.compatible_utils as compatible_utils
import fed.utils as fed_utils
from fed.proxy.grpc.grpc_proxy import GrpcSenderProxy, send_data_grpc

if compatible_utils._compare_version_strings(
fed_utils.get_package_version('protobuf'), '4.0.0'
):
from fed.grpc.pb4 import fed_pb2_grpc as fed_pb2_grpc
else:
from fed.grpc.pb3 import fed_pb2_grpc as fed_pb2_grpc


class TestGrpcSenderProxy(GrpcSenderProxy):
async def send(self, dest_party, data, upstream_seq_id, downstream_seq_id):
dest_addr = self._addresses[dest_party]
grpc_metadata, grpc_channel_options = self.get_grpc_config_by_party(dest_party)
if dest_party not in self._stubs:
channel = grpc.aio.insecure_channel(dest_addr, options=grpc_channel_options)
stub = fed_pb2_grpc.GrpcServiceStub(channel)
self._stubs[dest_party] = stub

timeout = self._proxy_config.timeout_in_ms / 1000
response: str = await send_data_grpc(
data=data,
stub=self._stubs[dest_party],
upstream_seq_id=upstream_seq_id,
downstream_seq_id=downstream_seq_id,
job_name=self._job_name,
timeout=timeout,
metadata=grpc_metadata,
)
assert response.code == 417
assert "JobName mis-match" in response.result
# So that process can exit
raise RuntimeError(response.result)

from fed.proxy.barriers import receiver_proxy_actor_name, sender_proxy_actor_name
from fed.proxy.grpc.grpc_proxy import GrpcSenderProxy

@fed.remote
class MyActor:
def __init__(self, party, data):
self.__data = data
self._party = party

def f(self):
return f"f({self._party}, ip is {ray.util.get_node_ip_address()})"


@fed.remote
def agg_fn(obj1, obj2):
return f"agg-{obj1}-{obj2}"


addresses = {
'job1': {
'alice': '127.0.0.1:11012',
'bob': '127.0.0.1:11011',
},
'job2': {
'alice': '127.0.0.1:12012',
'bob': '127.0.0.1:12011',
},
}


def run(party, job_name):
ray.init(address='local')
def run():
job_name = 'job_test'
ray.init(address='local', include_dashboard=False)
fed.init(
addresses=addresses[job_name],
party=party,
addresses={
'alice': '127.0.0.1:11012',
},
party='alice',
job_name=job_name,
sender_proxy_cls=TestGrpcSenderProxy,
sender_proxy_cls=GrpcSenderProxy,
config={
'cross_silo_comm': {
'exit_on_sending_failure': True,
Expand All @@ -99,22 +41,18 @@ def run(party, job_name):
},
)

sender_proxy_actor_name = f"SenderProxyActor_{job_name}"
receiver_proxy_actor_name = f"ReceiverProxyActor_{job_name}"
assert ray.get_actor(sender_proxy_actor_name)
assert ray.get_actor(receiver_proxy_actor_name)
assert ray.get_actor(sender_proxy_actor_name())
assert ray.get_actor(receiver_proxy_actor_name())

fed.shutdown()
ray.shutdown()


def test_multi_proxy_actor():
p_alice_job1 = multiprocessing.Process(target=run, args=('alice', 'job1'))
p_alice_job2 = multiprocessing.Process(target=run, args=('alice', 'job2'))
p_alice_job1.start()
p_alice_job2.start()
p_alice_job1.join()
p_alice_job2.join()
p_alice = multiprocessing.Process(target=run)
p_alice.start()
p_alice.join()
assert p_alice.exitcode == 0


if __name__ == "__main__":
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def pass_arg(d):


def run(party):
compatible_utils.init_ray(address='local')
compatible_utils.init_ray(address='local', include_dashboard=False)
addresses = {
'alice': '127.0.0.1:11012',
'bob': '127.0.0.1:11011',
'alice': '127.0.0.1:11355',
'bob': '127.0.0.1:11356',
}
allowed_list = {
"numpy.core.numeric": ["*"],
Expand Down
2 changes: 1 addition & 1 deletion fed/tests/simple_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def agg_fn(obj1, obj2):


def run(party):
ray.init(address='local')
ray.init(address='local', include_dashboard=False)
fed.init(addresses=addresses, party=party)
print(f"Running the script in party {party}")

Expand Down
4 changes: 2 additions & 2 deletions fed/tests/test_exit_on_failure_sending.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def get_value(self):
def run(party: str, q: multiprocessing.Queue):
compatible_utils.init_ray(address='local')
addresses = {
'alice': '127.0.0.1:11012',
'bob': '127.0.0.1:11011',
'alice': '127.0.0.1:21321',
'bob': '127.0.0.1:21322',
}
retry_policy = {
"maxAttempts": 2,
Expand Down
Loading

0 comments on commit bbc4bc2

Please sign in to comment.