Skip to content

Commit

Permalink
disable grpc reuse port (#126)
Browse files Browse the repository at this point in the history
The semantic of RecverProxyActor's is_ready is now changed to whether its grpc server is successfully listening on the specified port or not. Failed to listen on the port will raise an AssertionError.

Signed-off-by: paer <chenqixiang.cqx@antgroup.com>

---------

Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Co-authored-by: paer <chenqixiang.cqx@antgroup.com>
  • Loading branch information
NKcqx and paer authored Jun 20, 2023
1 parent 8eaab16 commit 4c7daf4
Show file tree
Hide file tree
Showing 21 changed files with 108 additions and 55 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ def main(party):
ray.init(address='local')

cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
fed.init(cluster=cluster, party=party)
```
This first declares a two-party cluster, whose addresses corresponding to '127.0.0.1:11010' in 'alice' and '127.0.0.1:11011' in 'bob'.
This first declares a two-party cluster, whose addresses corresponding to '127.0.0.1:11012' in 'alice' and '127.0.0.1:11011' in 'bob'.
And then, the `fed.init` create a cluster in the specified party.
Note that `fed.init` should be called twice, passing in the different party each time.

Expand Down Expand Up @@ -146,7 +146,7 @@ def main(party):
ray.init(address='local')

cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
fed.init(cluster=cluster, party=party)
Expand Down
1 change: 1 addition & 0 deletions fed/_private/grpc_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,5 @@ def get_grpc_options(
}
),
),
('grpc.so_reuseport', 0),
]
40 changes: 28 additions & 12 deletions fed/barriers.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ async def SendData(self, request, context):


async def _run_grpc_server(
port, event, all_data, party, lock, tls_config=None, grpc_options=None
port, event, all_data, party, lock,
server_ready_future, tls_config=None, grpc_options=None
):
server = grpc.aio.server(options=grpc_options)
fed_pb2_grpc.add_GrpcServiceServicer_to_server(
Expand All @@ -110,11 +111,13 @@ async def _run_grpc_server(
else:
server.add_insecure_port(f'[::]:{port}')

msg = f"Succeeded to add port {port}."
await server.start()
logger.info(
f'Successfully start Grpc service with{"out" if not tls_enabled else ""} '
'credentials.'
)
server_ready_future.set_result((True, msg))
await server.wait_for_termination()


Expand Down Expand Up @@ -302,24 +305,36 @@ def __init__(
set_max_message_length(config.cross_silo_messages_max_size)
# Workaround the threading coordinations

# Flag to see whether grpc server starts
self._server_ready_future = asyncio.Future()

# All events for grpc waitting usage.
self._events = {} # map from (upstream_seq_id, downstream_seq_id) to event
self._all_data = {} # map from (upstream_seq_id, downstream_seq_id) to data
self._lock = threading.Lock()

async def run_grpc_server(self):
return await _run_grpc_server(
self._listen_addr[self._listen_addr.index(':') + 1 :],
self._events,
self._all_data,
self._party,
self._lock,
self._tls_config,
get_grpc_options(self.retry_policy),
)
try:
port = self._listen_addr[self._listen_addr.index(':') + 1 :]
await _run_grpc_server(
port,
self._events,
self._all_data,
self._party,
self._lock,
self._server_ready_future,
self._tls_config,
get_grpc_options(self.retry_policy),
)
except RuntimeError as err:
msg = f'Grpc server failed to listen to port: {port}' \
f' Try another port by setting `listen_addr` into `cluster` config' \
f' when calling `fed.init`. Grpc error msg: {err}'
self._server_ready_future.set_result((False, msg))

async def is_ready(self):
return True
await self._server_ready_future
return self._server_ready_future.result()

async def get_data(self, src_aprty, upstream_seq_id, curr_seq_id):
self._stats["receive_op_count"] += 1
Expand Down Expand Up @@ -376,7 +391,8 @@ def start_recv_proxy(
retry_policy=retry_policy,
)
recver_proxy_actor.run_grpc_server.remote()
assert ray.get(recver_proxy_actor.is_ready.remote())
server_state = ray.get(recver_proxy_actor.is_ready.remote())
assert server_state[0], server_state[1]
logger.info("RecverProxy was successfully created.")


Expand Down
2 changes: 1 addition & 1 deletion tests/serializations_tests/test_unpickle_with_whitelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def pass_arg(d):
def run(party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
allowed_list = {
Expand Down
2 changes: 1 addition & 1 deletion tests/simple_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def agg_fn(obj1, obj2):


cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}

Expand Down
2 changes: 1 addition & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
def run():
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
}
fed.init(cluster=cluster, party="alice")
config = fed_config.get_cluster_config()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_async_startup_2_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _run(party: str):

compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
fed.init(cluster=cluster, party=party)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_basic_pass_fed_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_value(self):
def run(party, is_inner_party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
fed.init(cluster=cluster, party=party)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cache_fed_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def g(x, index):
def run(party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
fed.init(cluster=cluster, party=party)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_enable_tls_across_parties.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _run(party: str):
}

cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
fed.init(cluster=cluster, party=party, tls_config=cert_config)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_exit_on_failure_sending.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def run(party, is_inner_party):

compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
retry_policy = {
Expand Down
2 changes: 1 addition & 1 deletion tests/test_fed_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def mean(x, y):
def run(party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
fed.init(cluster=cluster, party=party)
Expand Down
5 changes: 3 additions & 2 deletions tests/test_grpc_options_on_proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def dummpy():
def run(party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'bob': {'address': '127.0.0.1:11011'},
'alice': {'address': '127.0.0.1:11019'},
'bob': {'address': '127.0.0.1:11018'},
}
fed.init(
cluster=cluster,
Expand All @@ -40,6 +40,7 @@ def _assert_on_proxy(proxy_actor):
options = ray.get(proxy_actor._get_grpc_options.remote())
assert options[0][0] == "grpc.max_send_message_length"
assert options[0][1] == 100
assert ('grpc.so_reuseport', 0) in options

send_proxy = ray.get_actor("SendProxyActor")
recver_proxy = ray.get_actor(f"RecverProxyActor-{party}")
Expand Down
79 changes: 57 additions & 22 deletions tests/test_listen_addr.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,28 @@ def get_value(self):
return self._value


def run(party, is_inner_party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010', 'listen_addr': '0.0.0.0:11010'},
'bob': {'address': '127.0.0.1:11011', 'listen_addr': '0.0.0.0:11011'},
}
fed.init(cluster=cluster, party=party)

o = f.party("alice").remote()
actor_location = "alice" if is_inner_party else "bob"
my = My.party(actor_location).remote(o)
val = my.get_value.remote()
result = fed.get(val)
assert result == 100
assert fed.get(o) == 100
import time

time.sleep(5)
fed.shutdown()
ray.shutdown()


def test_listen_addr():
def run(party, is_inner_party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11012', 'listen_addr': '0.0.0.0:11012'},
'bob': {'address': '127.0.0.1:11011', 'listen_addr': '0.0.0.0:11011'},
}
fed.init(cluster=cluster, party=party)

o = f.party("alice").remote()
actor_location = "alice" if is_inner_party else "bob"
my = My.party(actor_location).remote(o)
val = my.get_value.remote()
result = fed.get(val)
assert result == 100
assert fed.get(o) == 100
import time

time.sleep(5)
fed.shutdown()
ray.shutdown()

p_alice = multiprocessing.Process(target=run, args=('alice', True))
p_bob = multiprocessing.Process(target=run, args=('bob', True))
p_alice.start()
Expand All @@ -66,6 +65,42 @@ def test_listen_addr():
assert p_alice.exitcode == 0 and p_bob.exitcode == 0


def test_listen_used_addr():
def run(party):
import socket

compatible_utils.init_ray(address='local')
occupied_port = 11020
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# Pre-occuping the port
s.bind(("localhost", occupied_port))

cluster = {
'alice': {
'address': '127.0.0.1:11012',
'listen_addr': f'0.0.0.0:{occupied_port}'},
'bob': {
'address': '127.0.0.1:11011',
'listen_addr': '0.0.0.0:11011'},
}

# Starting grpc server on an used port will cause AssertionError
with pytest.raises(AssertionError):
fed.init(cluster=cluster, party=party)

import time

time.sleep(5)
s.close()
fed.shutdown()
ray.shutdown()

p_alice = multiprocessing.Process(target=run, args=('alice',))
p_alice.start()
p_alice.join()
assert p_alice.exitcode == 0


if __name__ == "__main__":
import sys

Expand Down
2 changes: 1 addition & 1 deletion tests/test_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def bar(x):
def run(party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
fed.init(cluster=cluster, party=party)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pass_fed_objects_in_containers_in_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def bar(self, li):


cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def bar(li):
def run(party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
fed.init(cluster=cluster, party=party)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_ping_others.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}

Expand Down
2 changes: 1 addition & 1 deletion tests/test_repeat_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def bar(self, li):


cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}

Expand Down
2 changes: 1 addition & 1 deletion tests/test_reset_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}

Expand Down
2 changes: 1 addition & 1 deletion tests/test_retry_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_value(self):
def run(party, is_inner_party):
compatible_utils.init_ray(address='local')
cluster = {
'alice': {'address': '127.0.0.1:11010'},
'alice': {'address': '127.0.0.1:11012'},
'bob': {'address': '127.0.0.1:11011'},
}
retry_policy = {
Expand Down

0 comments on commit 4c7daf4

Please sign in to comment.