Skip to content

Commit

Permalink
More explicit naming
Browse files Browse the repository at this point in the history
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
  • Loading branch information
paer committed Sep 18, 2023
1 parent 8a170d3 commit c667d4a
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 21 deletions.
4 changes: 2 additions & 2 deletions fed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
shutdown)
from fed.proxy.barriers import recv, send
from fed.fed_object import FedObject
from fed.exceptions import RemoteError
from fed.exceptions import FedRemoteError

__all__ = [
"get",
Expand All @@ -27,5 +27,5 @@
"recv",
"send",
"FedObject",
"RemoteError"
"FedRemoteError"
]
12 changes: 6 additions & 6 deletions fed/_private/message_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ def _loop():
self._thread = threading.Thread(target=_loop, name=self._thread_name)
self._thread.start()

def push(self, message):
def append(self, message):
self._queue.append(message)

def notify_to_exit(self):
logger.info(f"Notify message polling thread[{self._thread_name}] to exit.")
self.push(STOP_SYMBOL)
self.append(STOP_SYMBOL)

def stop(self):
"""
Expand All @@ -79,10 +79,10 @@ def stop(self):
If False: forcelly kill the for-loop sub-thread.
"""
if threading.current_thread() == self._thread:
logger.error(f"Can't stop the message queue in the message"
f"polling thread[{self._thread_name}], ignore it, this."
f"could bring unknown timing problem.")
return
logger.error(f"Can't stop the message queue in the message "
f"polling thread[{self._thread_name}]. Ignore it as this"
f"could bring unknown time sequence problems.")
raise RuntimeError("Thread can't kill itself")

# TODO(NKcqx): Force kill sub-thread by calling `._stop()` will
# encounter AssertionError because sub-thread's lock is not released.
Expand Down
5 changes: 2 additions & 3 deletions fed/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from fed._private import constants
from fed._private.fed_actor import FedActorHandle
from fed._private.fed_call_holder import FedCallHolder
from fed.exceptions import RemoteError
from fed.exceptions import FedRemoteError
from fed._private.global_context import (
init_global_context,
get_global_context,
Expand Down Expand Up @@ -436,14 +436,13 @@ def get(
fed_object._cache_ray_object_ref(received_ray_object_ref)
ray_refs.append(received_ray_object_ref)

# ray.get(ray_refs)
try:
values = ray.get(ray_refs)
if is_individual_id:
values = values[0]
return values
except RayError as e:
if isinstance(e.cause, RemoteError):
if isinstance(e.cause, FedRemoteError):
logger.warning("Encounter RemoteError happend in other parties"
f", prepare to exit, error message: {e.cause}")
if (get_global_context().acquire_shutdown_flag()):
Expand Down
8 changes: 4 additions & 4 deletions fed/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import signal
import threading
from fed._private.message_queue import MessageQueueManager
from fed.exceptions import RemoteError
from fed.exceptions import FedRemoteError
from ray.exceptions import RayError

import ray
Expand Down Expand Up @@ -105,9 +105,9 @@ def push_to_sending(self,
"""
msg_pack = (obj_ref, dest_party, upstream_seq_id, downstream_seq_id)
if (is_error):
self._sending_error_q.push(msg_pack)
self._sending_error_q.append(msg_pack)
else:
self._sending_data_q.push(msg_pack)
self._sending_data_q.append(msg_pack)

def _signal_exit(self):
"""
Expand Down Expand Up @@ -159,7 +159,7 @@ def _process_data_sending_task_return(self, message):
from fed.proxy.barriers import send
# TODO(NKcqx): Cascade broadcast to all parties
error_trace = e.cause if self._expose_error_trace else None
send(dest_party, RemoteError(self._current_party, error_trace),
send(dest_party, FedRemoteError(self._current_party, error_trace),
upstream_seq_id, downstream_seq_id, True)

res = False
Expand Down
4 changes: 2 additions & 2 deletions fed/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

class RemoteError(Exception):
class FedRemoteError(Exception):
def __init__(self, src_party: str, cause: Exception) -> None:
self._src_party = src_party
self._cause = cause

def __str__(self):
error_msg = f'RemoteError occurred at {self._src_party}'
error_msg = f'FedRemoteError occurred at {self._src_party}'
if self._cause is not None:
error_msg += f" caused by {str(self._cause)}"
return error_msg
8 changes: 4 additions & 4 deletions fed/tests/test_cross_silo_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import sys

from unittest.mock import Mock
from fed.exceptions import RemoteError
from fed.exceptions import FedRemoteError


class MyError(Exception):
Expand Down Expand Up @@ -70,7 +70,7 @@ def run(party):
with pytest.raises(Exception) as e:
fed.get(o)
if party == 'bob':
assert isinstance(e.value.cause, RemoteError)
assert isinstance(e.value.cause, FedRemoteError)
assert 'RemoteError occurred at alice' in str(e.value.cause)
assert "normal task Error" in str(e.value.cause)
else:
Expand Down Expand Up @@ -120,7 +120,7 @@ def run2(party):
fed.get(o)

if party == 'bob':
assert isinstance(e.value.cause, RemoteError)
assert isinstance(e.value.cause, FedRemoteError)
assert 'RemoteError occurred at alice' in str(e.value.cause)
assert "actor task Error" in str(e.value.cause)
my_failure_handler.assert_called_once()
Expand Down Expand Up @@ -170,7 +170,7 @@ def run3(party):
with pytest.raises(Exception) as e:
fed.get(o)
if party == 'bob':
assert isinstance(e.value.cause, RemoteError)
assert isinstance(e.value.cause, FedRemoteError)
assert 'RemoteError occurred at alice' in str(e.value.cause)
assert 'caused by' not in str(e.value.cause)
else:
Expand Down

0 comments on commit c667d4a

Please sign in to comment.