Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Compiled Graph] Execute DAG on Actor's Main Thread #48608

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

xslingcn
Copy link

@xslingcn xslingcn commented Nov 6, 2024

Why are these changes needed?

As mentioned in #46336, the current implementation executes all aDAGs in a background concurrency group _ray_system, and actors run in their own default concurrency group. This discrepancy blocks the DAG from accessing thread-local states within actors that were initialized prior to the DAG execution. For example, consider the following code:

import ray
import threading
from ray.dag import InputNode

@ray.remote
class MyActor:
    def __init__(self):
        # data local to actor default executor thread
        self.local_data = threading.local()
        self.local_data.seed = 42

    def compute(self, value):
        return value + self.local_data.seed

actor = MyActor.remote()

with InputNode() as inp:
    dag = actor.compute.bind(inp)

# DAG running in _ray_syetem group, no access to actor.local_data
compiled_dag = dag.experimental_compile()
print(ray.get(compiled_dag.execute(10)))

which will raise the error:

Traceback (most recent call last):
...
ray.exceptions.RayTaskError(AttributeError): ray::MyActor.__ray_call__() (pid=3136275, ip=172.0.0.1)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Code/ray/run_thread_local_dag.py", line 15, in compute
    return value + self.local_data.seed
                   ^^^^^^^^^^^^^^^^^^^^
AttributeError: '_thread._local' object has no attribute 'seed'

This PR makes the DAG execution loop to run on the actor's default executor (ref), which ensures the DAG running on the same thread as the actor. Now the example provided above should produce the expected output.

The thread_name API discussed in the original issue will be implemented in a seperate PR later.

Related issue number

#46336

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: xsling <me@xsl.sh>
@stephanie-wang
Copy link
Contributor

Thanks, @xslingcn ! Can you add a test similar to the one that you have in the PR description? Also, I think there may be some issues since there are sometimes background system tasks that need to run on a different thread from the execution one, and now they may be blocked. I'll unblock the buildkite/premerge tests to run so you can see the failures.

@stephanie-wang stephanie-wang self-assigned this Nov 12, 2024
@stephanie-wang stephanie-wang added the go add ONLY when ready to merge, run all tests label Nov 12, 2024
@rkooo567 rkooo567 self-assigned this Nov 12, 2024
@jcotant1 jcotant1 added core Issues that should be addressed in Ray Core compiled-graphs labels Nov 15, 2024
@xslingcn
Copy link
Author

xslingcn commented Nov 16, 2024

Thanks @stephanie-wang for reviewing this!

Can you add a test similar to the one that you have in the PR description?

Done with 8f6be8f .

Also, I think there may be some issues since there are sometimes background system tasks that need to run on a different thread from the execution one, and now they may be blocked.

It seems that ray now fails to teardown the dags, even if I moved the cancelation tasks to _ray_system concurrency group. Any hints on how to debug what tasks are blocking in the thread?

diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py

@@ -1940,7 +1938,9 @@ class CompiledDAG:
                     logger.info(f"Cancelling compiled worker on actor: {actor}")
                 # Cancel all actor loops in parallel.
                 cancel_refs = [
-                    actor.__ray_call__.remote(do_cancel_executable_tasks, tasks)
+                    actor.__ray_call__.options(
+                        concurrency_group="_ray_system"
+                    ).remote(do_cancel_executable_tasks, tasks)
                     for actor, tasks in outer.actor_to_executable_tasks.items()
                 ]
                 for cancel_ref in cancel_refs:
$ pytest -v -s python/ray/dag/tests/experimental/test_multi_node_dag.py
...
2024-11-16 10:34:10,738 INFO compiled_dag_node.py:1933 -- Tearing down compiled DAG
2024-11-16 10:34:10,738 INFO compiled_dag_node.py:1938 -- Cancelling compiled worker on actor: Actor(Actor, 95b3104e737fd143bb49c3a001000000)
2024-11-16 10:34:10,739 INFO compiled_dag_node.py:1938 -- Cancelling compiled worker on actor: Actor(Actor, f8bf38b6e3680bccb52e420a01000000)
2024-11-16 10:34:10,739 INFO compiled_dag_node.py:1938 -- Cancelling compiled worker on actor: Actor(Actor, 5080863784c80043a0018f8201000000)
2024-11-16 10:34:40,745 ERROR compiled_dag_node.py:1954 -- Error cancelling worker task
Traceback (most recent call last):
  File "/root/ray/python/ray/dag/compiled_dag_node.py", line 1948, in teardown
    ray.get(cancel_ref, timeout=30)
  File "/root/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/root/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/root/ray/python/ray/_private/worker.py", line 2755, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/ray/python/ray/_private/worker.py", line 882, in get_objects
    ] = self.core_worker.get_objects(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "python/ray/_raylet.pyx", line 3486, in ray._raylet.CoreWorker.get_objects
    check_status(op_status)
  File "python/ray/includes/common.pxi", line 81, in ray._raylet.check_status
    raise GetTimeoutError(message)
ray.exceptions.GetTimeoutError: Get timed out: some object(s) not ready.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
compiled-graphs core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants