Skip to content

Commit

Permalink
Minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
codingl2k1 committed Jan 4, 2024
1 parent 12dec4c commit 20c46f0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
36 changes: 29 additions & 7 deletions python/xoscar/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import asyncio
import functools
import inspect
import logging
import os
import sys
import threading
import uuid
from collections import defaultdict
Expand Down Expand Up @@ -45,6 +48,8 @@
from .backends.config import ActorPoolConfig
from .backends.pool import MainActorPoolType

logger = logging.getLogger(__name__)


async def create_actor(
actor_cls: Type, *args, uid=None, address=None, **kwargs
Expand Down Expand Up @@ -296,6 +301,7 @@ def __init__(self, uid: str, actor_addr: str, actor_uid: str):
self._actor_addr = actor_addr
self._actor_uid = actor_uid
self._actor_ref = None
self._gc_destroy = True

async def destroy(self):
if self._actor_ref is None:
Expand All @@ -308,15 +314,24 @@ async def destroy(self):
def __del__(self):
# It's not a good idea to spawn a new thread and join in __del__,
# but currently it's the only way to GC the generator.
thread = threading.Thread(
target=asyncio.run, args=(self.destroy(),), daemon=True
)
thread.start()
thread.join()
# TODO(codingl2k1): This __del__ may hangs if the program is exiting.
if self._gc_destroy:
thread = threading.Thread(
target=asyncio.run, args=(self.destroy(),), daemon=True
)
thread.start()
thread.join()

def __aiter__(self):
return self

def __getstate__(self):
# Transfer gc destroy during serialization.
state = dict(**super().__getstate__())
state["_gc_destroy"] = True
self._gc_destroy = False
return state

async def __anext__(self) -> T:
if self._actor_ref is None:
self._actor_ref = await actor_ref(
Expand Down Expand Up @@ -410,15 +425,20 @@ async def _async_wrapper(_gen):
f"but a {type(gen)} is got."
)
except Exception as e:
logger.exception(
f"Destory generator {generator_uid} due to an error encountered."
)
await self.__xoscar_destroy_generator__(generator_uid)
del gen # Avoid exception hold generator reference.
raise e
if r is stop:
await self.__xoscar_destroy_generator__(generator_uid)
del gen # Avoid exception hold generator reference.
raise Exception("StopIteration")
else:
return r
else:
raise RuntimeError(f"no iterator with id: {generator_uid}")
raise RuntimeError(f"No iterator with id: {generator_uid}")

async def __xoscar_destroy_generator__(self, generator_uid: str):
"""
Expand All @@ -429,7 +449,8 @@ async def __xoscar_destroy_generator__(self, generator_uid: str):
generator_uid: str
The uid of generator
"""
return self._generators.pop(generator_uid, None)
logger.debug("Destroy generator: %s", generator_uid)
self._generators.pop(generator_uid, None)


def generator(func):
Expand All @@ -443,6 +464,7 @@ async def _wrapper(self, *args, **kwargs):
r = await func(self, *args, **kwargs)
if inspect.isgenerator(r) or inspect.isasyncgen(r):
gen_uid = uuid.uuid1().hex
logger.debug("Create generator: %s", gen_uid)
self._generators[gen_uid] = r
return IteratorWrapper(gen_uid, self.address, self.uid)
else:
Expand Down
2 changes: 1 addition & 1 deletion python/xoscar/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ cdef class ActorRef:
return create_actor_ref, (self.address, self.uid)

def __getattr__(self, item):
if item.startswith('_'):
if item.startswith('_') and item not in ["__xoscar_next__", "__xoscar_destroy_generator__"]:
return object.__getattribute__(self, item)

try:
Expand Down
4 changes: 3 additions & 1 deletion python/xoscar/tests/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import os
import time

import pytest
Expand Down Expand Up @@ -160,7 +161,8 @@ async def test_generator():
all_gen = await superivsor_actor.get_all_generators()
assert len(all_gen) == 0

await asyncio.create_task(superivsor_actor.with_exception())
r = await superivsor_actor.with_exception()
del r
await asyncio.sleep(0)
all_gen = await superivsor_actor.get_all_generators()
assert len(all_gen) == 0
Expand Down

0 comments on commit 20c46f0

Please sign in to comment.