diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b1634b93..be11c32b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/psf/black - rev: 23.1.0 + rev: 24.2.0 hooks: - id: black files: python/xoscar @@ -24,7 +24,7 @@ repos: args: [--sp, python/setup.cfg] files: python/xoscar - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.4.1 + rev: v1.9.0 hooks: - id: mypy additional_dependencies: [tokenize-rt==3.2.0] diff --git a/python/xoscar/backends/communication/dummy.py b/python/xoscar/backends/communication/dummy.py index 7a05a129..7909b24e 100644 --- a/python/xoscar/backends/communication/dummy.py +++ b/python/xoscar/backends/communication/dummy.py @@ -97,9 +97,9 @@ class DummyServer(Server): else tuple() ) - _address_to_instances: weakref.WeakValueDictionary[ - str, "DummyServer" - ] = weakref.WeakValueDictionary() + _address_to_instances: weakref.WeakValueDictionary[str, "DummyServer"] = ( + weakref.WeakValueDictionary() + ) _channels: list[ChannelType] _tasks: list[asyncio.Task] scheme: str | None = "dummy" diff --git a/python/xoscar/backends/indigen/pool.py b/python/xoscar/backends/indigen/pool.py index dd9a08ef..045c77a7 100644 --- a/python/xoscar/backends/indigen/pool.py +++ b/python/xoscar/backends/indigen/pool.py @@ -187,14 +187,19 @@ async def start_sub_pool( def start_pool_in_process(): ctx = multiprocessing.get_context(method=start_method) status_queue = ctx.Queue() + main_pool_pid = os.getpid() with _suspend_init_main(): process = ctx.Process( target=cls._start_sub_pool, - args=(actor_pool_config, process_index, status_queue), + args=( + actor_pool_config, + process_index, + status_queue, + main_pool_pid, + ), name=f"IndigenActorPool{process_index}", ) - process.daemon = True process.start() # wait for sub actor pool to finish starting @@ -209,15 +214,22 @@ def start_pool_in_process(): @classmethod async def wait_sub_pools_ready(cls, create_pool_tasks: List[asyncio.Task]): - processes = [] + processes: list[multiprocessing.Process] = [] ext_addresses = [] + error = None for task in create_pool_tasks: process, status = await task + processes.append(process) if status.status == 1: # start sub pool failed - raise status.error.with_traceback(status.traceback) - processes.append(process) - ext_addresses.append(status.external_addresses) + error = status.error.with_traceback(status.traceback) + else: + ext_addresses.append(status.external_addresses) + if error: + for p in processes: + # error happens, kill all subprocesses + p.kill() + raise error return processes, ext_addresses @classmethod @@ -226,6 +238,7 @@ def _start_sub_pool( actor_config: ActorPoolConfig, process_index: int, status_queue: multiprocessing.Queue, + main_pool_pid: int, ): ensure_coverage() @@ -259,7 +272,9 @@ def _start_sub_pool( else: asyncio.set_event_loop(asyncio.new_event_loop()) - coro = cls._create_sub_pool(actor_config, process_index, status_queue) + coro = cls._create_sub_pool( + actor_config, process_index, status_queue, main_pool_pid + ) asyncio.run(coro) @classmethod @@ -268,6 +283,7 @@ async def _create_sub_pool( actor_config: ActorPoolConfig, process_index: int, status_queue: multiprocessing.Queue, + main_pool_pid: int, ): process_status = None try: @@ -276,7 +292,11 @@ async def _create_sub_pool( if env: os.environ.update(env) pool = await SubActorPool.create( - {"actor_pool_config": actor_config, "process_index": process_index} + { + "actor_pool_config": actor_config, + "process_index": process_index, + "main_pool_pid": main_pool_pid, + } ) external_addresses = cur_pool_config["external_address"] process_status = SubpoolStatus( @@ -342,14 +362,14 @@ async def append_sub_pool( def start_pool_in_process(): ctx = multiprocessing.get_context(method=start_method) status_queue = ctx.Queue() + main_pool_pid = os.getpid() with _suspend_init_main(): process = ctx.Process( target=self._start_sub_pool, - args=(self._config, process_index, status_queue), + args=(self._config, process_index, status_queue, main_pool_pid), name=f"IndigenActorPool{process_index}", ) - process.daemon = True process.start() # wait for sub actor pool to finish starting diff --git a/python/xoscar/backends/indigen/tests/test_pool.py b/python/xoscar/backends/indigen/tests/test_pool.py index a79a5d3d..96881671 100644 --- a/python/xoscar/backends/indigen/tests/test_pool.py +++ b/python/xoscar/backends/indigen/tests/test_pool.py @@ -17,12 +17,14 @@ import asyncio import logging +import multiprocessing import os import re import sys import time from unittest import mock +import psutil import pytest from .... import Actor, create_actor, create_actor_ref, get_pool_config, kill_actor @@ -1099,3 +1101,80 @@ def test(): assert process_index not in config.get_process_indexes() with pytest.raises(KeyError): config.get_pool_config(process_index) + + +async def _run(started: multiprocessing.Event): # type: ignore + pool = await create_actor_pool( # type: ignore + "127.0.0.1", pool_cls=MainActorPool, n_process=1 + ) + + class DummyActor(Actor): + @staticmethod + def test(): + return "this is dummy!" + + ref = await create_actor( + DummyActor, address=pool.external_address, allocate_strategy=RandomSubPool() + ) + assert ref is not None + + started.set() # type: ignore + await pool.join() + + +def _run_in_process(started: multiprocessing.Event): # type: ignore + asyncio.run(_run(started)) + + +@pytest.mark.asyncio +async def test_sub_pool_quit_with_main_pool(): + s = multiprocessing.Event() + p = multiprocessing.Process(target=_run_in_process, args=(s,)) + p.start() + s.wait() + + processes = psutil.Process(p.pid).children() + assert len(processes) == 1 + + # kill main process + p.kill() + p.join() + await asyncio.sleep(1) + + # subprocess should have died + assert not psutil.pid_exists(processes[0].pid) + + +def _add(x: int) -> int: + return x + 1 + + +class _ProcessActor(Actor): + def run(self, x: int): + p = multiprocessing.Process(target=_add, args=(x,)) + p.start() + p.join() + return x + 1 + + +@pytest.mark.asyncio +async def test_process_in_actor(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + pool = await create_actor_pool( # type: ignore + "127.0.0.1", + pool_cls=MainActorPool, + n_process=1, + subprocess_start_method=start_method, + ) + + async with pool: + ref = await create_actor( + _ProcessActor, + address=pool.external_address, + allocate_strategy=RandomSubPool(), + ) + assert 2 == await ref.run(1) diff --git a/python/xoscar/backends/pool.py b/python/xoscar/backends/pool.py index 749ef8cc..16d27c6f 100644 --- a/python/xoscar/backends/pool.py +++ b/python/xoscar/backends/pool.py @@ -27,6 +27,8 @@ from abc import ABC, ABCMeta, abstractmethod from typing import Any, Callable, Coroutine, Optional, Type, TypeVar +import psutil + from .._utils import TypeDispatcher, create_actor_ref, to_binary from ..api import Actor from ..core import ActorRef, BufferRef, FileObjectRef, register_local_pool @@ -821,7 +823,8 @@ def handle_channel(channel): class SubActorPoolBase(ActorPoolBase): - __slots__ = ("_main_address",) + __slots__ = ("_main_address", "_watch_main_pool_task") + _watch_main_pool_task: Optional[asyncio.Task] def __init__( self, @@ -834,6 +837,7 @@ def __init__( config: ActorPoolConfig, servers: list[Server], main_address: str, + main_pool_pid: Optional[int], ): super().__init__( process_index, @@ -846,6 +850,26 @@ def __init__( servers, ) self._main_address = main_address + if main_pool_pid: + self._watch_main_pool_task = asyncio.create_task( + self._watch_main_pool(main_pool_pid) + ) + else: + self._watch_main_pool_task = None + + async def _watch_main_pool(self, main_pool_pid: int): + main_process = psutil.Process(main_pool_pid) + while not self.stopped: + try: + await asyncio.to_thread(main_process.status) + await asyncio.sleep(0.1) + continue + except (psutil.NoSuchProcess, ProcessLookupError, asyncio.CancelledError): + # main pool died + break + + if not self.stopped: + await self.stop() async def notify_main_pool_to_destroy( self, message: DestroyActorMessage @@ -900,14 +924,22 @@ async def handle_control_command( @staticmethod def _parse_config(config: dict, kw: dict) -> dict: + main_pool_pid = config.pop("main_pool_pid", None) kw = AbstractActorPool._parse_config(config, kw) pool_config: ActorPoolConfig = kw["config"] main_process_index = pool_config.get_process_indexes()[0] kw["main_address"] = pool_config.get_pool_config(main_process_index)[ "external_address" ][0] + kw["main_pool_pid"] = main_pool_pid return kw + async def stop(self): + await super().stop() + if self._watch_main_pool_task: + self._watch_main_pool_task.cancel() + await self._watch_main_pool_task + class MainActorPoolBase(ActorPoolBase): __slots__ = ( diff --git a/python/xoscar/backends/test/pool.py b/python/xoscar/backends/test/pool.py index d0892eab..ae6fe77f 100644 --- a/python/xoscar/backends/test/pool.py +++ b/python/xoscar/backends/test/pool.py @@ -55,7 +55,7 @@ async def start_sub_pool( status_queue: multiprocessing.Queue = multiprocessing.Queue() return ( asyncio.create_task( - cls._create_sub_pool(actor_pool_config, process_index, status_queue) + cls._create_sub_pool(actor_pool_config, process_index, status_queue, 0) ), status_queue, ) @@ -77,9 +77,14 @@ async def _create_sub_pool( actor_config: ActorPoolConfig, process_index: int, status_queue: multiprocessing.Queue, + main_pool_pid: int, ): pool: TestSubActorPool = await TestSubActorPool.create( - {"actor_pool_config": actor_config, "process_index": process_index} + { + "actor_pool_config": actor_config, + "process_index": process_index, + "main_pool_pid": main_pool_pid, + } ) await pool.start() status_queue.put( diff --git a/python/xoscar/backends/test/tests/test_transfer.py b/python/xoscar/backends/test/tests/test_transfer.py index 1ea51ab2..677ff5ff 100644 --- a/python/xoscar/backends/test/tests/test_transfer.py +++ b/python/xoscar/backends/test/tests/test_transfer.py @@ -120,7 +120,7 @@ async def _copy_test(scheme1: Optional[str], scheme2: Optional[str], cpu: bool): external_address_schemes=[None, scheme1, scheme2], ) - async with pool: + async with pool, pool2: ctx = get_context() # actor on main pool diff --git a/python/xoscar/collective/common.py b/python/xoscar/collective/common.py index 59599ff3..dc039c8a 100644 --- a/python/xoscar/collective/common.py +++ b/python/xoscar/collective/common.py @@ -55,30 +55,30 @@ class AllReduceAlgorithm(IntEnum): TypeMappingGloo: Dict[Type[np.dtype], "xp.GlooDataType_t"] = { - np.int8: xp.GlooDataType_t.glooInt8, - np.uint8: xp.GlooDataType_t.glooUint8, - np.int32: xp.GlooDataType_t.glooInt32, - np.uint32: xp.GlooDataType_t.glooUint32, - np.int64: xp.GlooDataType_t.glooInt64, - np.uint64: xp.GlooDataType_t.glooUint64, - np.float16: xp.GlooDataType_t.glooFloat16, - np.float32: xp.GlooDataType_t.glooFloat32, - np.float64: xp.GlooDataType_t.glooFloat64, + np.int8: xp.GlooDataType_t.glooInt8, # type: ignore + np.uint8: xp.GlooDataType_t.glooUint8, # type: ignore + np.int32: xp.GlooDataType_t.glooInt32, # type: ignore + np.uint32: xp.GlooDataType_t.glooUint32, # type: ignore + np.int64: xp.GlooDataType_t.glooInt64, # type: ignore + np.uint64: xp.GlooDataType_t.glooUint64, # type: ignore + np.float16: xp.GlooDataType_t.glooFloat16, # type: ignore + np.float32: xp.GlooDataType_t.glooFloat32, # type: ignore + np.float64: xp.GlooDataType_t.glooFloat64, # type: ignore } cupy = lazy_import("cupy") if cupy is not None: from cupy.cuda import nccl TypeMappingNCCL: Dict[Type[np.dtype], int] = { - np.int8: nccl.NCCL_INT8, - np.uint8: nccl.NCCL_UINT8, - np.int32: nccl.NCCL_INT32, - np.uint32: nccl.NCCL_UINT32, - np.int64: nccl.NCCL_INT64, - np.uint64: nccl.NCCL_UINT64, - np.float16: nccl.NCCL_FLOAT16, - np.float32: nccl.NCCL_FLOAT32, - np.float64: nccl.NCCL_FLOAT64, + np.int8: nccl.NCCL_INT8, # type: ignore + np.uint8: nccl.NCCL_UINT8, # type: ignore + np.int32: nccl.NCCL_INT32, # type: ignore + np.uint32: nccl.NCCL_UINT32, # type: ignore + np.int64: nccl.NCCL_INT64, # type: ignore + np.uint64: nccl.NCCL_UINT64, # type: ignore + np.float16: nccl.NCCL_FLOAT16, # type: ignore + np.float32: nccl.NCCL_FLOAT32, # type: ignore + np.float64: nccl.NCCL_FLOAT64, # type: ignore } ReduceOpMappingNCCL: Dict[CollectiveReduceOp, int] = { diff --git a/python/xoscar/collective/tests/test_core.py b/python/xoscar/collective/tests/test_core.py index ec3c8adf..25c22230 100644 --- a/python/xoscar/collective/tests/test_core.py +++ b/python/xoscar/collective/tests/test_core.py @@ -57,7 +57,7 @@ async def init_process_group_without_env(self): await init_process_group(self._rank, self._world) async def test_params(self): - rank_ref: ActorRefType[RankActor] = await actor_ref( + rank_ref: ActorRefType[RankActor] = await actor_ref( # type: ignore address=self.address, uid="RankActor" ) uid = rank_ref.uid @@ -72,7 +72,7 @@ async def test_params(self): backend = await rank_ref.backend() assert backend == "gloo" - pg: ProcessGroup = await rank_ref.process_group("default") + pg: ProcessGroup = await rank_ref.process_group("default") # type: ignore assert pg is not None assert pg.rank == self._rank diff --git a/python/xoscar/tests/test_batch.py b/python/xoscar/tests/test_batch.py index 38494911..262937a8 100644 --- a/python/xoscar/tests/test_batch.py +++ b/python/xoscar/tests/test_batch.py @@ -222,88 +222,98 @@ def method(self, args_list, kwargs_list): @pytest.mark.asyncio async def test_no_lock(): pool = await create_actor_pool("127.0.0.1", n_process=1) - addr = pool.external_address - - # DummyActor calls ``test_method`` of WorkActor in ``do`` method, - # while WorkerActor depends on DummyActor ref to complete ``test_method``. - # If there is no ``no_lock`` decorator, dead lock happens. - class DummyActor(Actor): - def __init__(self): - super().__init__() - self.arg_list = [] - self.kwarg_list = [] - self.test_seq = [] - - @classmethod - def default_uid(cls): - return "DummyActor" - - @extensible - @no_lock - def method(self, *args, **kwargs): - self.arg_list.append(tuple(a * 2 for a in args)) - self.kwarg_list.append({k: v * 2 for k, v in kwargs.items()}) - return len(self.kwarg_list) - - @method.batch - @no_lock - def b_method(self, args_list, kwargs_list): - self.arg_list.extend([tuple(a * 2 + 1 for a in args) for args in args_list]) - self.kwarg_list.extend( - [{k: v * 2 + 1 for k, v in kwargs.items()} for kwargs in kwargs_list] - ) - return [len(self.kwarg_list)] * len(args_list) - - @no_lock - async def no_lock_test(self, i): - self.test_seq.append(i) - await asyncio.sleep(1) - self.test_seq.append(i + 1) - - def get_lock_test_result(self): - return self.test_seq - - async def do(self): - ref = await actor_ref(address=self.address, uid=WorkerActor.default_uid()) - await ref.test_method() - - async def do_batch(self): - ref = await actor_ref(address=self.address, uid=WorkerActor.default_uid()) - await ref.test_method_batch() - - class WorkerActor(Actor): - def __init__(self): - super().__init__() - - @classmethod - def default_uid(cls): - return "WorkerActor" - - async def __post_create__(self): - self._dummy_ref = await actor_ref( - address=self.address, uid=DummyActor.default_uid() - ) - - async def test_method(self): - assert await self._dummy_ref.method(1, kwarg=2) == 1 - assert getattr(DummyActor.method, NO_LOCK_ATTRIBUTE_HINT, False) is True - - async def test_method_batch(self): - assert await self._dummy_ref.method.batch( - self._dummy_ref.method.delay(1, kwarg=2), - self._dummy_ref.method.delay(3, kwarg=4), - ) == [3, 3] - - ref = await create_actor(DummyActor, address=addr, uid=DummyActor.default_uid()) - await create_actor(WorkerActor, address=addr, uid=WorkerActor.default_uid()) - await ref.do() - await ref.do_batch() - - ref2 = await create_actor( - DummyActor, - address=next(iter(pool.sub_processes.keys())), - uid=DummyActor.default_uid(), - ) - await asyncio.gather(ref2.no_lock_test(1), ref2.no_lock_test(3)) - r = await ref2.get_lock_test_result() - assert r == [1, 3, 2, 4] + async with pool: + addr = pool.external_address + + # DummyActor calls ``test_method`` of WorkActor in ``do`` method, + # while WorkerActor depends on DummyActor ref to complete ``test_method``. + # If there is no ``no_lock`` decorator, dead lock happens. + class DummyActor(Actor): + def __init__(self): + super().__init__() + self.arg_list = [] + self.kwarg_list = [] + self.test_seq = [] + + @classmethod + def default_uid(cls): + return "DummyActor" + + @extensible + @no_lock + def method(self, *args, **kwargs): + self.arg_list.append(tuple(a * 2 for a in args)) + self.kwarg_list.append({k: v * 2 for k, v in kwargs.items()}) + return len(self.kwarg_list) + + @method.batch + @no_lock + def b_method(self, args_list, kwargs_list): + self.arg_list.extend( + [tuple(a * 2 + 1 for a in args) for args in args_list] + ) + self.kwarg_list.extend( + [ + {k: v * 2 + 1 for k, v in kwargs.items()} + for kwargs in kwargs_list + ] + ) + return [len(self.kwarg_list)] * len(args_list) + + @no_lock + async def no_lock_test(self, i): + self.test_seq.append(i) + await asyncio.sleep(1) + self.test_seq.append(i + 1) + + def get_lock_test_result(self): + return self.test_seq + + async def do(self): + ref = await actor_ref( + address=self.address, uid=WorkerActor.default_uid() + ) + await ref.test_method() + + async def do_batch(self): + ref = await actor_ref( + address=self.address, uid=WorkerActor.default_uid() + ) + await ref.test_method_batch() + + class WorkerActor(Actor): + def __init__(self): + super().__init__() + + @classmethod + def default_uid(cls): + return "WorkerActor" + + async def __post_create__(self): + self._dummy_ref = await actor_ref( + address=self.address, uid=DummyActor.default_uid() + ) + + async def test_method(self): + assert await self._dummy_ref.method(1, kwarg=2) == 1 + assert getattr(DummyActor.method, NO_LOCK_ATTRIBUTE_HINT, False) is True + + async def test_method_batch(self): + assert await self._dummy_ref.method.batch( + self._dummy_ref.method.delay(1, kwarg=2), + self._dummy_ref.method.delay(3, kwarg=4), + ) == [3, 3] + + ref = await create_actor(DummyActor, address=addr, uid=DummyActor.default_uid()) + await create_actor(WorkerActor, address=addr, uid=WorkerActor.default_uid()) + await ref.do() + await ref.do_batch() + + ref2 = await create_actor( + DummyActor, + address=next(iter(pool.sub_processes.keys())), + uid=DummyActor.default_uid(), + ) + await asyncio.gather(ref2.no_lock_test(1), ref2.no_lock_test(3)) + r = await ref2.get_lock_test_result() + assert r == [1, 3, 2, 4] diff --git a/python/xoscar/tests/test_generator.py b/python/xoscar/tests/test_generator.py index 3ebb15ad..391be566 100644 --- a/python/xoscar/tests/test_generator.py +++ b/python/xoscar/tests/test_generator.py @@ -47,11 +47,15 @@ def get_all_generators(self): @xo.generator async def chat(self): - worker_actor: xo.ActorRef["WorkerActor"] = await xo.actor_ref( + worker_actor: xo.ActorRef["WorkerActor"] = await xo.actor_ref( # type: ignore address=address, uid=WorkerActor.uid() ) yield "sync" - async for x in await worker_actor.chat(): # this is much confused. I will suggest use async generators only. + async for ( + x + ) in ( + await worker_actor.chat() + ): # this is much confused. I will suggest use async generators only. yield x yield "async" @@ -96,85 +100,86 @@ def uid(cls): async def test_generator(): - await xo.create_actor_pool(address, 2) - await xo.create_actor(WorkerActor, address=address, uid=WorkerActor.uid()) - superivsor_actor = await xo.create_actor( - SupervisorActor, address=address, uid=SupervisorActor.uid() - ) - - all_gen = await superivsor_actor.get_all_generators() - assert len(all_gen) == 0 - output = [] - async for x in await superivsor_actor.chat(): + pool = await xo.create_actor_pool(address, 2) + async with pool: + await xo.create_actor(WorkerActor, address=address, uid=WorkerActor.uid()) + superivsor_actor = await xo.create_actor( + SupervisorActor, address=address, uid=SupervisorActor.uid() + ) + + all_gen = await superivsor_actor.get_all_generators() + assert len(all_gen) == 0 + output = [] + async for x in await superivsor_actor.chat(): + all_gen = await superivsor_actor.get_all_generators() + assert len(all_gen) == 1 + output.append(x) + all_gen = await superivsor_actor.get_all_generators() + assert len(all_gen) == 0 + assert output == [ + "sync", + "h", + "e", + "l", + "l", + "o", + " ", + "o", + "s", + "c", + "a", + "r", + " ", + "b", + "y", + " ", + "s", + "y", + "n", + "c", + "async", + "h", + "e", + "l", + "l", + "o", + " ", + "o", + "s", + "c", + "a", + "r", + " ", + "b", + "y", + " ", + "a", + "s", + "y", + "n", + "c", + ] + + with pytest.raises(Exception, match="intent"): + async for _ in await superivsor_actor.with_exception(): + pass + all_gen = await superivsor_actor.get_all_generators() + assert len(all_gen) == 0 + + r = await superivsor_actor.with_exception() + pickle.loads(pickle.dumps(r)) + del r + await asyncio.sleep(0) all_gen = await superivsor_actor.get_all_generators() - assert len(all_gen) == 1 - output.append(x) - all_gen = await superivsor_actor.get_all_generators() - assert len(all_gen) == 0 - assert output == [ - "sync", - "h", - "e", - "l", - "l", - "o", - " ", - "o", - "s", - "c", - "a", - "r", - " ", - "b", - "y", - " ", - "s", - "y", - "n", - "c", - "async", - "h", - "e", - "l", - "l", - "o", - " ", - "o", - "s", - "c", - "a", - "r", - " ", - "b", - "y", - " ", - "a", - "s", - "y", - "n", - "c", - ] - - with pytest.raises(Exception, match="intent"): - async for _ in await superivsor_actor.with_exception(): - pass - all_gen = await superivsor_actor.get_all_generators() - assert len(all_gen) == 0 - - r = await superivsor_actor.with_exception() - pickle.loads(pickle.dumps(r)) - del r - await asyncio.sleep(0) - all_gen = await superivsor_actor.get_all_generators() - assert len(all_gen) == 0 - - for f in [superivsor_actor.mix_gen, superivsor_actor.mix_gen2]: - out = [] - async for x in await f(1): - out.append(x) - assert out == [0, 1, 2] - out = [] - async for x in await f(2): - out.append(x) - assert out == [0, 1, 2] - assert 0 == await f(0) + assert len(all_gen) == 0 + + for f in [superivsor_actor.mix_gen, superivsor_actor.mix_gen2]: + out = [] + async for x in await f(1): + out.append(x) + assert out == [0, 1, 2] + out = [] + async for x in await f(2): + out.append(x) + assert out == [0, 1, 2] + assert 0 == await f(0)