Skip to content

Commit

Permalink
Fix CI
Browse files Browse the repository at this point in the history
Signed-off-by: Qing Wang <kingchin1218@gmail.com>
  • Loading branch information
jovany-wang committed Jul 20, 2023
1 parent d566e5a commit c216f71
Showing 1 changed file with 11 additions and 62 deletions.
73 changes: 11 additions & 62 deletions tests/test_listening_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ def run(party, is_inner_party):
'alice': '127.0.0.1:11012',
'bob': '127.0.0.1:11011',
}

listening_address = '0.0.0.0:11012' if party == 'alice' else '0.0.0.0:11011'
listening_address = '0.0.0.0:11012' if party == 'alice' else '0.0.0.0:11011'
fed.init(
addresses=addresses,
party=party,
config={
"cross_silo_message": {
"listening_address": listening_address,
'cross_silo_message': {
'listening_address': listening_address
}
},
)
Expand All @@ -66,7 +65,7 @@ def run(party, is_inner_party):
ray.shutdown()


def test_listen_addr():
def test_listening_address():
p_alice = multiprocessing.Process(target=run, args=('alice', True))
p_bob = multiprocessing.Process(target=run, args=('bob', True))
p_alice.start()
Expand All @@ -76,6 +75,7 @@ def test_listen_addr():
assert p_alice.exitcode == 0 and p_bob.exitcode == 0


def test_listen_used_address():
def run(party):
import socket

Expand All @@ -92,29 +92,21 @@ def run(party):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", occupied_port))

# addresses = {
# 'alice': {
# 'address': '127.0.0.1:11012',
# 'listen_addr': f'0.0.0.0:{occupied_port}'},
# 'bob': {
# 'address': '127.0.0.1:11011',
# 'listen_addr': '0.0.0.0:11011'},
# }
addresses = {
'alice': '127.0.0.1:11012',
'bob': '127.0.0.1:11011',
'bob': '127.0.0.1:11011',
}
listening_address = f'0.0.0.0:{occupied_port}' if party == 'alice' else '0.0.0.0:11011'

# Starting grpc server on an used port will cause AssertionError
with pytest.raises(AssertionError):
listening_address = '0.0.0.0:11012' if party == 'alice' else '0.0.0.0:11011'
fed.init(
addresses=addresses,
party=party,
config={
"cross_silo_message": {
"listening_address": listening_address,
}
'cross_silo_message': {
'listening_address': listening_address
}
},
)

Expand All @@ -125,50 +117,7 @@ def run(party):
fed.shutdown()
ray.shutdown()


def _run(party):
import socket

compatible_utils.init_ray(address='local')
occupied_port = 11020
# NOTE(NKcqx): Firstly try to bind IPv6 because the grpc server will do so.
# Otherwise this UT will false because socket bind $occupied_port
# on IPv4 address while grpc server listendn Ipv6 address.
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
# Pre-occuping the port
s.bind(("::", occupied_port))
except OSError:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", occupied_port))

addresses = {
'alice': '127.0.0.1:11012',
'bob': '127.0.0.1:11011',
}

# Starting grpc server on an used port will cause AssertionError
with pytest.raises(AssertionError):
listening_address = '0.0.0.0:11012' if party == 'alice' else '0.0.0.0:11011'
fed.init(
addresses=addresses,
party=party,
config={
"cross_silo_message": {
"listening_address": listening_address,
}
},
)

import time

time.sleep(5)
s.close()
fed.shutdown()
ray.shutdown()

def test_listen_used_addr():
p_alice = multiprocessing.Process(target=_run, args=('alice',))
p_alice = multiprocessing.Process(target=run, args=('alice',))
p_alice.start()
p_alice.join()
assert p_alice.exitcode == 0
Expand Down

0 comments on commit c216f71

Please sign in to comment.