Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Cross-silo error broadcasting #175

Merged
merged 21 commits into from
Sep 19, 2023
Merged

[PoC] Cross-silo error broadcasting #175

merged 21 commits into from
Sep 19, 2023

Conversation

NKcqx
Copy link
Collaborator

@NKcqx NKcqx commented Aug 23, 2023

Background

Before this PR, when the execution of a DAG encounters an error in 'alice', below is what will happen:
image

In alice, both main thread and data sending thread will raise the error, and the process will exit.
In bob, since it needs the input from 'alice', it will waits for 'alice' forever no matter whether 'alice' exists or not.

Therefore, we need a mechanism to inform the other participant when the DAG execution raises error.

What's in this PR

The below graph shows what will happen now after this PR:
image

In alice, when the data-sending thread finds a RayTaskError indicating a execution failure, it will wrap it to a RemoteError object and replace the original data object in place to send to bob.
In bob, the main thread will poll data from receiver actor, where it finds out the data is in the type of RemoteError and re-raises it, and gets an exception just as what happens in "alice".

The threading model in this PR is shown below:
image

The explanation of the _atomic_shutdown_flag

When the failure happens, both main thread and data thread get the error and trigger the shutdown, which will execute the "failure handler" twice. The typical method to ensure the failure_handler is executed only once is to set up a flag to check whether it has been executed or not, and wrap it with threading.lock because it's a critical section.

However, this will cause the dead lock as shown in below's graph.
The data thread triggers the shutdown stage by sending SIGINT signal that is implemented by causing KeyboardInterrupt error (step 8). In order to handle the exception, OS will hold the context of the current process, including the acquired threading.lock in step 6, and change the context to error handler, i.e. the signal handler in step 9. Since the lock has not yet released, acquiring the same lock will cause the dead lock (step 10).
image

The solution is to check the lock before sending the signal. That lock is _atomic_shutdown_flag.

paer added 7 commits August 23, 2023 10:21
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
@NKcqx NKcqx changed the title Cross-silo error broadcasting [PoC] Cross-silo error broadcasting Aug 24, 2023


class MessageQueue:
def __init__(self, msg_handler, failure_handler=None, name=''):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thread-safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inner implementation, i.e. deque is thread-safe and the MessageQueue is only used inside RayFed instead a public util.

fed/cleanup.py Outdated
lambda msg: self._process_data_message(msg),
name='DataQueue')

self._sending_error_q = MessageQueue(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need seperate queues instead of sharing one queue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errors should be sent out-of-band.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to understand the outofband in this context?

@@ -188,6 +188,11 @@ async def get_data(self, src_party, upstream_seq_id, curr_seq_id):
data = await self._proxy_instance.get_data(
src_party, upstream_seq_id, curr_seq_id
)
if isinstance(data, Exception):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need to expose exc_type and src_party, the exception message should be private

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the exception will be wrap to a RemoteException that only contains type and src_party and send to other parties

paer added 8 commits August 29, 2023 11:49
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
@NKcqx NKcqx requested review from a team and fengsp September 11, 2023 06:30
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
@NKcqx NKcqx added the enhancement New feature or request label Sep 11, 2023
@NKcqx NKcqx added this to the release0.1.1 milestone Sep 11, 2023
@fengsp
Copy link
Collaborator

fengsp commented Sep 11, 2023

Good docs, maybe it is better we maintain a RayFed Enhancement Proposals directory and put this in it? So that these docs are better managed.

self._cause = cause

def __str__(self):
return f'RemoteError occurred at {self._src_party} caused by {str(self._cause)}'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is {str(self._cause)} only the type of exception? We need to make sure this will not include error message detail.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this includes all the error stacks.
May I ask why can't details be included?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users might use sensitive error messages which is private.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I'll add a flag to set whether exposing stack trace, default to False

# can avoid shutdown twice, so that the failure handler can be
# executed only once.

if (get_global_context() is not None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is get_global_context thread safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not. The risk only exists when intended=False, because other functions except for the user-defined failure_handler can be executed multiple times without side effects; and failure_handler only gets called when intended=False .

As for the time intended=False:

  1. The entrance of shutdown(intended=False) inside RayFed is thread-safe, other calls in driver codes are not; There indeed exists risks that user calls shutdown(intended=False) in driver without acquiring the lock, which will accidentally execute the failure_handler twice. What do you think if we only expose shutdown(intended=True) as API, and make shutdown(intended=False) as private method?

  2. As described in "The explanation of the _atomic_shutdown_flag", since shutdown(where get_global_context called) will be entered by OS interrupt's error handler, any lock inside shutdown may get hang if it's already been acquired but not yet released before the interrupt. Therefore, making get_global_context() thread-safe is not recommended.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the global context is set to None by other threads?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This SHOULD only be done by calling shutdown.
Though there's risk that our developer didn't following the rule by mistake, the shutdown is indeed can't using lock to be thread-safe.

fed/cleanup.py Outdated
if (isinstance(e, RayError)):
logger.info(f"Sending error {e.cause} to {dest_party}.")
from fed.proxy.barriers import send
# TODO(NKcqx): Maybe broadcast to all parties?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have to broadcast to all parties, for example A -> B -> C,A triggers error then A and B exit, however C will hang forever?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll start another PR to implement, since this PR is already hard to understand

paer added 2 commits September 11, 2023 20:22
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
@jovany-wang
Copy link
Collaborator

Solid work! I'm going to review soon.

fed/_private/exceptions.py Outdated Show resolved Hide resolved
def get_failure_handler(self) -> Callable[[], None]:
return self._failure_handler

def acquire_shutdown_flag(self) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too weird to pass the method to cleanup manager. A better practice I believe is implementing it in cleanup manager and then use it in anywhere.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. The reason to do so is that:
The lock has to be maintained in GlobalContext , because it's indeed a part of the job context;
, but directly accessing it in CleanupManager needs to import "global_context" which causes a cross-import.

fed/_private/global_context.py Outdated Show resolved Hide resolved
fed/_private/global_context.py Show resolved Hide resolved
fed/_private/global_context.py Outdated Show resolved Hide resolved
fed/_private/queue.py Outdated Show resolved Hide resolved
fed/_private/queue.py Outdated Show resolved Hide resolved
fed/cleanup.py Outdated Show resolved Hide resolved
fed/cleanup.py Outdated
lambda msg: self._process_data_message(msg),
name='DataQueue')

self._sending_error_q = MessageQueue(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to understand the outofband in this context?

fed/cleanup.py Outdated Show resolved Hide resolved
paer added 2 commits September 15, 2023 00:42
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
@NKcqx
Copy link
Collaborator Author

NKcqx commented Sep 18, 2023

In this context, "out-of-band" refers to apart control messages, including the error message, from data messages.

Copy link
Collaborator

@jovany-wang jovany-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just left some minor comments.

@fengsp @zhouaihui Please take a look and do you think it's ok to add the flag in the next PR?

fed/__init__.py Outdated Show resolved Hide resolved
fed/_private/message_queue.py Outdated Show resolved Hide resolved
If False: forcelly kill the for-loop sub-thread.
"""
if threading.current_thread() == self._thread:
logger.error(f"Can't stop the message queue in the message"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should be an assertion statement or it should be raising an error instead of just logging, because it's a bug if the code path gets reached.

fed/api.py Show resolved Hide resolved
fed/api.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@jovany-wang jovany-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And let's pending to merge before the REP's done.

Signed-off-by: paer <chenqixiang.cqx@antgroup.com>
@NKcqx
Copy link
Collaborator Author

NKcqx commented Sep 19, 2023

Good docs, maybe it is better we maintain a RayFed Enhancement Proposals directory and put this in it? So that these docs are better managed.

Has filed a Rayfed enhancement proposal, pls take a look: #179

cc @jovany-wang

@jovany-wang
Copy link
Collaborator

Thanks. It got approved!

@NKcqx NKcqx merged commit b688dc9 into main Sep 19, 2023
12 checks passed
@jovany-wang jovany-wang deleted the cross_silo_err branch September 19, 2023 06:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants