Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add RayCliHook and RayCliOperator #37

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

kyle-hamlin
Copy link
Collaborator

Hi team,

This was a first attempt at integrating Airflow and Ray together using a wrapper for the Ray CLI. I wasn't a big fan of having to fetch the cluster_config YAML and script file so I abandoned this approach in favor of using the KubernetePodOperator so as to keep Airflow and Ray decoupled from one another. That being said, I understand there are many use cases out there and some may be completely happy using this kind of operator.

Aside from my deployment thoughts, I do think it would be better to convert this operator to be a wrapper for the Ray SDK rather than the CLI as it would make a number of tasks quite a bit easier. Here is some seriously rough pseudo-code for how we might go about structuring the hook and ultimately a series of operators:

from ray.autoscaler.sdk import (
    create_or_update_cluster,
    run_on_cluster,
    teardown_cluster,
)

from typing import Union

class RaySDKHook:

    def __init__(
        cluster_config: Union[dict, str],
        ...
    ):

    self.cluster_config = self._get_cluster_config()


    def _get_cluster_config(self):
        if isinstance(self.cluster_config, dict):
            return self.cluster_config
        elif isinstance(self.cluster_config, str):
            if self.cluster_config.startswith("s3://"):
                # Get from s3
                # Could also apply templating here
            elif self.cluster_config.startswith("gs://"):
                # Get from gs
                # Could also apply templating here
            else:
                # Get from local file system
                # Could also apply templating here


    def start_cluster(self):
        ...
        create_or_update_cluster(cluster_config=self.cluster_config)
        ...
    

    def stop_cluster(self):
        ...
        teardown_cluster(cluster_config=self.cluster_config)
        ...
    

    def submit_job(self):
        ...
        run_on_cluster(cluster_config=self.cluster_config)
        ...


class RayStartClusterOperator:

    def execute():
        hook = RaySDKHook()
        hook.start_cluster()


class RayStopClusterOperator:

    def execute():
        hook = RaySDKHook()
        hook.stop_cluster()


class RayStopClusterOperator:

    def execute():
        hook = RaySDKHook()
        hook.submit_job()

@dimberman
Copy link
Collaborator

This looks great so far. @Mokubyow @grechaw would this just require that we pip install ray on the machine? I imagine the ray CLI doesn't need any extras, yes?

@dimberman
Copy link
Collaborator

I think having a Ray start and ray stop operator makes a LOT of sense here. Even for the decorator it would be great to allow people to start a ray cluster at the start of a DAG and then shut it down at the end of a DAG.

Copy link
Collaborator

@dimberman dimberman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few small things but overall looks really good!

ray_provider/operators/ray_cli.py Outdated Show resolved Hide resolved

def execute(self, context):
self.hook = self.get_hook()
self.hook.run()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not return self.hook.run()? Seems like that function is returning stdoout.

ray_provider/hooks/ray_cli.py Outdated Show resolved Hide resolved
@kyle-hamlin
Copy link
Collaborator Author

kyle-hamlin commented Sep 9, 2021

@dimberman Thanks for the feedback! I'd actually like to get some feedback from the Ray team on using the SDK methods as I outlined above vs. the CLI wrapper approach before proceeding. As I mentioned, I think it will be a bit easier to write/extend and create robust operator/s that way. @richardliaw @worldveil @grechaw Thoughts?

@grechaw
Copy link
Contributor

grechaw commented Sep 9, 2021

I think it looks good. I have my usual concerns about it not being OOTB supported on Anyscale.

It's not like we have a ready way to make this work with Anyscale anyhow, but your approach will make that simple to graft in when we prioritize it.

@dimberman
Copy link
Collaborator

@Mokubyow I spoke with the Anyscale team and it sounds like there's no real way to do this via the SDK. However, the fact that you're using Popen instead of using Bash directly should be sufficient to prevent injections so should be good from a security standpoint.

@kyle-hamlin
Copy link
Collaborator Author

@dimberman @worldveil @grechaw I've pushed a WIP implementation of what using the SDK could look like vs wrapping the CLI directly. I'm partial to the SDK approach, as I've expressed, so I believe much of the decision on which route to pursue will come down to how stable the ray team thinks the SDK will be. Anyway, looking forward to chatting about both approaches and getting something out to the community!

@kyle-hamlin kyle-hamlin marked this pull request as ready for review September 22, 2021 23:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants