Skip to content

Commit

Permalink
Add test cases combine ucx/ipv6/listen_elastic_ip
Browse files Browse the repository at this point in the history
  • Loading branch information
frostyplanet committed Jul 14, 2024
1 parent 127fab2 commit 8eaf728
Showing 1 changed file with 209 additions and 0 deletions.
209 changes: 209 additions & 0 deletions python/xoscar/backends/indigen/tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,184 @@ async def test_create_actor_pool():
assert len(global_router._mapping) == 0


@pytest.mark.asyncio
async def test_create_actor_pool_elastic_ip():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
addr = f"111.111.111.111:{get_next_port()}"
pool = await create_actor_pool(
addr,
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
extra_conf={"listen_elastic_ip": True},
)
async with pool:
# test global router
global_router = Router.get_instance()
# global router should not be the identical one with pool's router
assert global_router is not pool.router
assert pool.external_address in global_router._curr_external_addresses
assert pool.external_address in global_router._mapping
assert pool.external_address == addr

ctx = get_context()

# actor on main pool
actor_ref = await ctx.create_actor(
TestActor, uid="test-1", address=pool.external_address
)
assert await actor_ref.add(3) == 3
await ctx.destroy_actor(actor_ref)
assert (await ctx.has_actor(actor_ref)) is False

assert pool.stopped
# after pool shutdown, global router must has been cleaned
global_router = Router.get_instance()
assert len(global_router._curr_external_addresses) == 0
assert len(global_router._mapping) == 0


@pytest.mark.asyncio
async def test_create_actor_pool_fix_all_zero_ip():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
port = get_next_port()
addr = f"0.0.0.0:{port}"
pool = await create_actor_pool(
addr,
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
)
async with pool:
# test global router
global_router = Router.get_instance()
# global router should not be the identical one with pool's router
assert global_router is not pool.router
assert pool.external_address in global_router._curr_external_addresses
assert pool.external_address in global_router._mapping
assert pool.external_address == addr

ctx = get_context()

# actor on main pool
actor_ref = await ctx.create_actor(
TestActor, uid="test-1", address=pool.external_address
)
assert await actor_ref.add(3) == 3
connect_addr = f"127.0.0.1:{port}"
actor_ref2 = await ctx.actor_ref(address=connect_addr, uid="test-1")
# test fix_all_zero_ip, the result is not 0.0.0.0
assert actor_ref2.address == connect_addr
assert await actor_ref.add(3) == 6

await ctx.destroy_actor(actor_ref)
assert (await ctx.has_actor(actor_ref)) is False

assert pool.stopped
# after pool shutdown, global router must has been cleaned
global_router = Router.get_instance()
assert len(global_router._curr_external_addresses) == 0
assert len(global_router._mapping) == 0


@pytest.mark.asyncio
async def test_create_actor_pool_ipv6():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
port = get_next_port()
addr = f":::{port}"
pool = await create_actor_pool(
addr,
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
)
async with pool:
# test global router
global_router = Router.get_instance()
# global router should not be the identical one with pool's router
assert global_router is not pool.router
assert pool.external_address in global_router._curr_external_addresses
assert pool.external_address in global_router._mapping
assert pool.external_address == addr

ctx = get_context()

# actor on main pool
actor_ref = await ctx.create_actor(
TestActor, uid="test-1", address=pool.external_address
)
assert await actor_ref.add(3) == 3
await ctx.destroy_actor(actor_ref)
assert (await ctx.has_actor(actor_ref)) is False

assert pool.stopped
# after pool shutdown, global router must has been cleaned
global_router = Router.get_instance()
assert len(global_router._curr_external_addresses) == 0
assert len(global_router._mapping) == 0


@pytest.mark.asyncio
async def test_create_actor_pool_ipv6_elastic_ip():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
port = get_next_port()
# ip not exists on local host
addr = f"FFFF:34::55::1:{port}"
pool = await create_actor_pool(
addr,
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
extra_conf={"listen_elastic_ip": True},
)
async with pool:
# test global router
global_router = Router.get_instance()
# global router should not be the identical one with pool's router
assert global_router is not pool.router
assert pool.external_address in global_router._curr_external_addresses
assert pool.external_address in global_router._mapping
assert pool.external_address == addr

ctx = get_context()
# actor on main pool
actor_ref = await ctx.create_actor(
TestActor, uid="test-1", address=pool.external_address
)
# test fix_all_zero_ip, the result is not :::port
assert actor_ref.address == addr
assert await actor_ref.add(3) == 3
connect_addr = f"::1:{port}"
actor_ref2 = await ctx.actor_ref(address=connect_addr, uid="test-1")
assert await actor_ref2.add(4) == 7
assert actor_ref2.address == addr

await ctx.destroy_actor(actor_ref)
assert (await ctx.has_actor(actor_ref)) is False

assert pool.stopped
# after pool shutdown, global router must has been cleaned
global_router = Router.get_instance()
assert len(global_router._curr_external_addresses) == 0
assert len(global_router._mapping) == 0


@pytest.mark.asyncio
async def test_errors():
with pytest.raises(ValueError):
Expand Down Expand Up @@ -1010,6 +1188,37 @@ async def test_ucx(enable_internal_addr: bool):
assert await ref1.foo(ref2, 3) == 6


@require_ucx
@pytest.mark.asyncio
async def test_ucx_elastic_ip():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
port = get_next_port()
addr = f"111.111.111.111:{port}"
pool = await create_actor_pool( # type: ignore
addr,
pool_cls=MainActorPool,
n_process=0,
subprocess_start_method=start_method,
external_address_schemes=["ucx"],
extra_conf={"listen_elastic_ip": True},
)

async with pool:
ctx = get_context()
ref1 = await ctx.create_actor(
TestUCXActor, init_val=0, address=pool.external_address, uid="test-ucx"
)
assert ref1.address == "ucx://" + addr
ref2 = await ctx.actor_ref(address=f"ucx://127.0.0.1:{port}", uid="test-ucx")
assert await ref2.add(1) == 1
assert await ref1.add(2) == 2
assert ref2.address == "ucx://" + addr


@pytest.mark.asyncio
async def test_append_sub_pool():
start_method = (
Expand Down

0 comments on commit 8eaf728

Please sign in to comment.