From c667d4ad7c3e4e7e8c84777b174b999b19cb173b Mon Sep 17 00:00:00 2001 From: paer Date: Mon, 18 Sep 2023 23:32:41 +0800 Subject: [PATCH] More explicit naming Signed-off-by: paer --- fed/__init__.py | 4 ++-- fed/_private/message_queue.py | 12 ++++++------ fed/api.py | 5 ++--- fed/cleanup.py | 8 ++++---- fed/exceptions.py | 4 ++-- fed/tests/test_cross_silo_error.py | 8 ++++---- 6 files changed, 20 insertions(+), 21 deletions(-) diff --git a/fed/__init__.py b/fed/__init__.py index 5c55dd4..7636dd4 100644 --- a/fed/__init__.py +++ b/fed/__init__.py @@ -16,7 +16,7 @@ shutdown) from fed.proxy.barriers import recv, send from fed.fed_object import FedObject -from fed.exceptions import RemoteError +from fed.exceptions import FedRemoteError __all__ = [ "get", @@ -27,5 +27,5 @@ "recv", "send", "FedObject", - "RemoteError" + "FedRemoteError" ] diff --git a/fed/_private/message_queue.py b/fed/_private/message_queue.py index f55ecdf..c4a1b6e 100644 --- a/fed/_private/message_queue.py +++ b/fed/_private/message_queue.py @@ -60,12 +60,12 @@ def _loop(): self._thread = threading.Thread(target=_loop, name=self._thread_name) self._thread.start() - def push(self, message): + def append(self, message): self._queue.append(message) def notify_to_exit(self): logger.info(f"Notify message polling thread[{self._thread_name}] to exit.") - self.push(STOP_SYMBOL) + self.append(STOP_SYMBOL) def stop(self): """ @@ -79,10 +79,10 @@ def stop(self): If False: forcelly kill the for-loop sub-thread. """ if threading.current_thread() == self._thread: - logger.error(f"Can't stop the message queue in the message" - f"polling thread[{self._thread_name}], ignore it, this." - f"could bring unknown timing problem.") - return + logger.error(f"Can't stop the message queue in the message " + f"polling thread[{self._thread_name}]. Ignore it as this" + f"could bring unknown time sequence problems.") + raise RuntimeError("Thread can't kill itself") # TODO(NKcqx): Force kill sub-thread by calling `._stop()` will # encounter AssertionError because sub-thread's lock is not released. diff --git a/fed/api.py b/fed/api.py index 1fccfcb..d89cc71 100644 --- a/fed/api.py +++ b/fed/api.py @@ -27,7 +27,7 @@ from fed._private import constants from fed._private.fed_actor import FedActorHandle from fed._private.fed_call_holder import FedCallHolder -from fed.exceptions import RemoteError +from fed.exceptions import FedRemoteError from fed._private.global_context import ( init_global_context, get_global_context, @@ -436,14 +436,13 @@ def get( fed_object._cache_ray_object_ref(received_ray_object_ref) ray_refs.append(received_ray_object_ref) - # ray.get(ray_refs) try: values = ray.get(ray_refs) if is_individual_id: values = values[0] return values except RayError as e: - if isinstance(e.cause, RemoteError): + if isinstance(e.cause, FedRemoteError): logger.warning("Encounter RemoteError happend in other parties" f", prepare to exit, error message: {e.cause}") if (get_global_context().acquire_shutdown_flag()): diff --git a/fed/cleanup.py b/fed/cleanup.py index cbb8e74..6b974f8 100644 --- a/fed/cleanup.py +++ b/fed/cleanup.py @@ -17,7 +17,7 @@ import signal import threading from fed._private.message_queue import MessageQueueManager -from fed.exceptions import RemoteError +from fed.exceptions import FedRemoteError from ray.exceptions import RayError import ray @@ -105,9 +105,9 @@ def push_to_sending(self, """ msg_pack = (obj_ref, dest_party, upstream_seq_id, downstream_seq_id) if (is_error): - self._sending_error_q.push(msg_pack) + self._sending_error_q.append(msg_pack) else: - self._sending_data_q.push(msg_pack) + self._sending_data_q.append(msg_pack) def _signal_exit(self): """ @@ -159,7 +159,7 @@ def _process_data_sending_task_return(self, message): from fed.proxy.barriers import send # TODO(NKcqx): Cascade broadcast to all parties error_trace = e.cause if self._expose_error_trace else None - send(dest_party, RemoteError(self._current_party, error_trace), + send(dest_party, FedRemoteError(self._current_party, error_trace), upstream_seq_id, downstream_seq_id, True) res = False diff --git a/fed/exceptions.py b/fed/exceptions.py index 719fb7c..dad4abf 100644 --- a/fed/exceptions.py +++ b/fed/exceptions.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -class RemoteError(Exception): +class FedRemoteError(Exception): def __init__(self, src_party: str, cause: Exception) -> None: self._src_party = src_party self._cause = cause def __str__(self): - error_msg = f'RemoteError occurred at {self._src_party}' + error_msg = f'FedRemoteError occurred at {self._src_party}' if self._cause is not None: error_msg += f" caused by {str(self._cause)}" return error_msg diff --git a/fed/tests/test_cross_silo_error.py b/fed/tests/test_cross_silo_error.py index 1af8755..17547a0 100644 --- a/fed/tests/test_cross_silo_error.py +++ b/fed/tests/test_cross_silo_error.py @@ -21,7 +21,7 @@ import sys from unittest.mock import Mock -from fed.exceptions import RemoteError +from fed.exceptions import FedRemoteError class MyError(Exception): @@ -70,7 +70,7 @@ def run(party): with pytest.raises(Exception) as e: fed.get(o) if party == 'bob': - assert isinstance(e.value.cause, RemoteError) + assert isinstance(e.value.cause, FedRemoteError) assert 'RemoteError occurred at alice' in str(e.value.cause) assert "normal task Error" in str(e.value.cause) else: @@ -120,7 +120,7 @@ def run2(party): fed.get(o) if party == 'bob': - assert isinstance(e.value.cause, RemoteError) + assert isinstance(e.value.cause, FedRemoteError) assert 'RemoteError occurred at alice' in str(e.value.cause) assert "actor task Error" in str(e.value.cause) my_failure_handler.assert_called_once() @@ -170,7 +170,7 @@ def run3(party): with pytest.raises(Exception) as e: fed.get(o) if party == 'bob': - assert isinstance(e.value.cause, RemoteError) + assert isinstance(e.value.cause, FedRemoteError) assert 'RemoteError occurred at alice' in str(e.value.cause) assert 'caused by' not in str(e.value.cause) else: