Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for incorrect channel read behavior after accelerated DAG teardown #46320

Merged
merged 9 commits into from
Jul 1, 2024

Conversation

jackhumphries
Copy link
Contributor

@jackhumphries jackhumphries commented Jun 28, 2024

Why are these changes needed?

Prior to this PR (described in #46284), calling ray.get() on a CompiledDAGRef (i.e., a channel) after DAG teardown would return a large series of zeroes. This issue could be reproduced with this script:

import ray
from ray.dag import InputNode

@ray.remote
class Actor:
    def foo(self, arg):
        return arg
        
a = Actor.remote()
with InputNode() as inp:
    dag = a.foo.bind(inp)
    
dag = dag.experimental_compile()
x = dag.execute(1)
dag.teardown()
# `ray.get(x)` returns a large series of zeroes.
print(ray.get(x))

This issue happened because the channel was unregistered with the mutable object manager on DAG teardown, and thus on a subsequent access to the channel, the core worker thought the channel reference was for a normal immutable Ray object rather than for a channel mutable object. Thus, the core worker was returning the raw underlying memory for the mutable object, and the memory buffers were sized equal to the total size of the underlying memory, not the amount of data in the mutable object.

This PR fixes this issue by properly checking that a channel is either currently registered or previously registered, rather than just checking only that the channel is currently registered.

Related issue number

Closes #46284

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@jackhumphries jackhumphries changed the title Channel error fix after accelerated DAG teardown Fix for incorrect channel read behavior after accelerated DAG teardown Jun 28, 2024
@jackhumphries jackhumphries force-pushed the 46284-fix branch 2 times, most recently from dea62f1 to 2154d1c Compare June 28, 2024 08:20
@kevin85421
Copy link
Member

This issue happened because the channel was unregistered with the mutable object manager on DAG teardown

Where does unregister happen? Are you referring to SetErrorInternal?

the core worker thought the channel reference was for a normal immutable Ray object rather than for a channel mutable object. Thus, the core worker was returning the raw underlying memory for the mutable object

Are you suggesting that we should use GetExperimentalMutableObjects to read the object for the correct result, but due to the channel being unregistered, GetObjects was used instead?

@jackhumphries
Copy link
Contributor Author

This issue happened because the channel was unregistered with the mutable object manager on DAG teardown

Where does unregister happen? Are you referring to SetErrorInternal?

the core worker thought the channel reference was for a normal immutable Ray object rather than for a channel mutable object. Thus, the core worker was returning the raw underlying memory for the mutable object

Are you suggesting that we should use GetExperimentalMutableObjects to read the object for the correct result, but due to the channel being unregistered, GetObjects was used instead?

When I said "unregistered", I meant that SetErrorInternal() was called on the channel, as you said. This method then sets reader_registered to false for the channel.

For the second question, what you said is correct.

Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great findings!

src/ray/core_worker/core_worker.cc Outdated Show resolved Hide resolved
src/ray/core_worker/core_worker.cc Outdated Show resolved Hide resolved
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description makes sense to me, but I have a question:

Based on my observation at #46284 (comment), the actor method can get an error message like (Actor pid=2054169) check_status: Channel closed. False True. This means that:

  • Shared memory channel calls close(), and writes error into has_error successfully. This also implies that the channels are unregistered.
  • However, it can still get the error which means it calls CheckHasError which seems only be called in ReadAcquire. If it is unregistered, it should not call ReadAcquire. There might be some time difference between setting has_error and unregistration. However, the actor always prints the log.

src/ray/core_worker/core_worker.cc Show resolved Hide resolved
src/ray/core_worker/core_worker.cc Outdated Show resolved Hide resolved
@jackhumphries
Copy link
Contributor Author

jackhumphries commented Jun 28, 2024

The PR description makes sense to me, but I have a question:

Based on my observation at #46284 (comment), the actor method can get an error message like (Actor pid=2054169) check_status: Channel closed. False True. This means that:

  • Shared memory channel calls close(), and writes error into has_error successfully. This also implies that the channels are unregistered.
  • However, it can still get the error which means it calls CheckHasError which seems only be called in ReadAcquire. If it is unregistered, it should not call ReadAcquire. There might be some time difference between setting has_error and unregistration. However, the actor always prints the log.

I think what's happening is the actor is calling WriteAcquire() on the same channel, which does not call WriterChannelRegistered() to check if the channel is registered. Currently, CoreWorker::Get() does check ReaderChannelRegistered(), which is why the behavior is different.

After this PR is merged, can you add a check for WriterChannelRegistered()? I think this could be a good way to get more familiar with the C++ codebase. Thanks!

Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@kevin85421
Copy link
Member

After this PR is merged, can you add a check for WriterChannelRegistered()? I think this could be a good way to get more familiar with the C++ codebase. Thanks!

Sounds good. Thanks!

@jackhumphries jackhumphries added the go add ONLY when ready to merge, run all tests label Jun 28, 2024
Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to approve. Still wondering if the API can be improved.

src/ray/core_worker/core_worker.cc Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_accelerated_dag.py Outdated Show resolved Hide resolved
src/ray/core_worker/experimental_mutable_object_provider.h Outdated Show resolved Hide resolved
@stephanie-wang
Copy link
Contributor

Should we just not reset reader_registered and writer_registered to false for now? That seems like a simpler fix.

@jackhumphries
Copy link
Contributor Author

Should we just not reset reader_registered and writer_registered to false for now? That seems like a simpler fix.

I'd prefer to keep this as is, if that's alright. When I do the channel garbage collection work, I would imagine I'm going to need to implement what is currently in this PR if we don't keep it.

Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
Signed-off-by: Jack Humphries <1645405+jackhumphries@users.noreply.github.com>
@can-anyscale can-anyscale merged commit 8a0d633 into ray-project:master Jul 1, 2024
5 of 6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][experimental] Calling ray.get() on CompiledDAGRef after dag.teardown() or actor failure hangs
5 participants