Skip to content

Commit

Permalink
Add cross tests for append_sub_pool & listen_elastic_ip
Browse files Browse the repository at this point in the history
  • Loading branch information
frostyplanet committed Jul 30, 2024
1 parent f0ef763 commit 112fd2e
Showing 1 changed file with 108 additions and 1 deletion.
109 changes: 108 additions & 1 deletion python/xoscar/backends/indigen/tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ async def test_ucx_elastic_ip():


@pytest.mark.asyncio
async def test_append_sub_pool():
async def test_append_sub_pool_multiprocess():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
Expand All @@ -1240,6 +1240,7 @@ async def test_append_sub_pool():
# test add a new sub pool
sub_external_address = await pool.append_sub_pool(env={"foo": "bar"})
assert sub_external_address is not None
assert sub_external_address.startswith("127.0.0.1:")

config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 4
Expand Down Expand Up @@ -1270,6 +1271,112 @@ def test():
await ref.test()


@pytest.mark.asyncio
async def test_append_sub_pool_multi_process_elastic_ip():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
pool = await create_actor_pool( # type: ignore
"111.111.111.111",
pool_cls=MainActorPool,
n_process=2,
subprocess_start_method=start_method,
extra_conf={"listen_elastic_ip": True},
)

async with pool:
config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 3

# test add a new sub pool
sub_external_address = await pool.append_sub_pool(env={"foo": "bar"})
assert sub_external_address is not None
assert sub_external_address.startswith("111.111.111.111:")

config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 4
process_index = config.get_process_indexes()[-1]
sub_config = config.get_pool_config(process_index)
assert sub_config["external_address"][0] == sub_external_address
assert sub_config["env"] is not None
assert sub_config["env"].get("foo", None) == "bar"

class DummyActor(Actor):
@staticmethod
def test():
return "this is dummy!"

ref = await create_actor(DummyActor, address=sub_external_address)
assert ref is not None
assert ref.address == sub_external_address
assert await ref.test() == "this is dummy!"

# test remove
await pool.remove_sub_pool(sub_external_address)
config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 3
assert process_index not in config.get_process_indexes()
with pytest.raises(KeyError):
config.get_pool_config(process_index)
with pytest.raises(Exception):
await ref.test()


@pytest.mark.asyncio
async def test_append_sub_pool_single_process_elastic_ip():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
pool = await create_actor_pool( # type: ignore
f"111.111.111.111:{get_next_port()}",
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
extra_conf={"listen_elastic_ip": True},
)

async with pool:
config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 1

# test add a new sub pool
sub_external_address = await pool.append_sub_pool(env={"foo": "bar"})
assert sub_external_address is not None
assert sub_external_address.startswith("111.111.111.111:")

config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 2
process_index = config.get_process_indexes()[-1]
sub_config = config.get_pool_config(process_index)
assert sub_config["external_address"][0] == sub_external_address
assert sub_config["env"] is not None
assert sub_config["env"].get("foo", None) == "bar"

class DummyActor(Actor):
@staticmethod
def test():
return "this is dummy!"

ref = await create_actor(DummyActor, address=sub_external_address)
assert ref is not None
assert ref.address == sub_external_address
assert await ref.test() == "this is dummy!"

# test remove
await pool.remove_sub_pool(sub_external_address)
config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 1
assert process_index not in config.get_process_indexes()
with pytest.raises(KeyError):
config.get_pool_config(process_index)
with pytest.raises(Exception):
await ref.test()


@pytest.mark.asyncio
async def test_test_pool_append_sub_pool():
pool = await create_actor_pool( # type: ignore
Expand Down

0 comments on commit 112fd2e

Please sign in to comment.