Skip to content

Commit

Permalink
Add test cases combine ucx/ipv6/listen_elastic_ip
Browse files Browse the repository at this point in the history
  • Loading branch information
frostyplanet committed Jul 14, 2024
1 parent 127fab2 commit e7ee694
Showing 1 changed file with 160 additions and 0 deletions.
160 changes: 160 additions & 0 deletions python/xoscar/backends/indigen/tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,135 @@ async def test_create_actor_pool():
assert len(global_router._mapping) == 0


@pytest.mark.asyncio
async def test_create_actor_pool_elastic_ip():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
addr = f"111.111.111.111:{get_next_port()}"
pool = await create_actor_pool(
addr,
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
extra_conf={"listen_elastic_ip": True},
)
async with pool:
# test global router
global_router = Router.get_instance()
# global router should not be the identical one with pool's router
assert global_router is not pool.router
assert pool.external_address in global_router._curr_external_addresses
assert pool.external_address in global_router._mapping
assert pool.external_address == addr

ctx = get_context()

# actor on main pool
actor_ref = await ctx.create_actor(
TestActor, uid="test-1", address=pool.external_address
)
assert await actor_ref.add(3) == 3
await ctx.destroy_actor(actor_ref)
assert (await ctx.has_actor(actor_ref)) is False

assert pool.stopped
# after pool shutdown, global router must has been cleaned
global_router = Router.get_instance()
assert len(global_router._curr_external_addresses) == 0
assert len(global_router._mapping) == 0


@pytest.mark.asyncio
async def test_create_actor_pool_fix_all_zero_ip():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
port = get_next_port()
addr = f"0.0.0.0:{port}"
pool = await create_actor_pool(
addr,
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
)
async with pool:
# test global router
global_router = Router.get_instance()
# global router should not be the identical one with pool's router
assert global_router is not pool.router
assert pool.external_address in global_router._curr_external_addresses
assert pool.external_address in global_router._mapping
assert pool.external_address == addr

ctx = get_context()

# actor on main pool
actor_ref = await ctx.create_actor(
TestActor, uid="test-1", address=pool.external_address
)
assert await actor_ref.add(3) == 3
connect_addr = f"127.0.0.1:{port}"
actor_ref2 = await ctx.actor_ref(address=connect_addr, uid="test-1")
# test fix_all_zero_ip, the result is not 0.0.0.0
assert actor_ref2.address == connect_addr
assert await actor_ref.add(3) == 6

await ctx.destroy_actor(actor_ref)
assert (await ctx.has_actor(actor_ref)) is False

assert pool.stopped
# after pool shutdown, global router must has been cleaned
global_router = Router.get_instance()
assert len(global_router._curr_external_addresses) == 0
assert len(global_router._mapping) == 0


@pytest.mark.asyncio
async def test_create_actor_pool_ipv6():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
port = get_next_port()
addr = f":::{port}"
pool = await create_actor_pool(
addr,
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
)
async with pool:
# test global router
global_router = Router.get_instance()
# global router should not be the identical one with pool's router
assert global_router is not pool.router
assert pool.external_address in global_router._curr_external_addresses
assert pool.external_address in global_router._mapping
assert pool.external_address == addr

ctx = get_context()

# actor on main pool
actor_ref = await ctx.create_actor(
TestActor, uid="test-1", address=pool.external_address
)
assert await actor_ref.add(3) == 3
await ctx.destroy_actor(actor_ref)
assert (await ctx.has_actor(actor_ref)) is False

assert pool.stopped
# after pool shutdown, global router must has been cleaned
global_router = Router.get_instance()
assert len(global_router._curr_external_addresses) == 0
assert len(global_router._mapping) == 0


@pytest.mark.asyncio
async def test_errors():
with pytest.raises(ValueError):
Expand Down Expand Up @@ -1010,6 +1139,37 @@ async def test_ucx(enable_internal_addr: bool):
assert await ref1.foo(ref2, 3) == 6


@require_ucx
@pytest.mark.asyncio
async def test_ucx_elastic_ip():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
port = get_next_port()
pool = await create_actor_pool( # type: ignore
f"111.111.111.111:{get_next_port()}",
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
external_address_schemes=["ucx"],
extra_conf={"listen_elastic_ip": True},
)

async with pool:
ctx = get_context()
ref1 = await ctx.create_actor(
TestUCXActor,
0,
address=pool.external_address,
allocate_strategy=ProcessIndex(0),
)
assert await ref1.add(1) == 1
ref2 = await ctx.actor_ref(address=f"ucx://127.0.0.1:{port}", uid="")
assert await ref2.add(1) == 2


@pytest.mark.asyncio
async def test_append_sub_pool():
start_method = (
Expand Down

0 comments on commit e7ee694

Please sign in to comment.