Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implements anyscale operators and sensors #41

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

mlopezgez
Copy link

This pull request implements Anyscale operators and sensors.

@glyfnet glyfnet added enhancement New feature or request help wanted Extra attention is needed labels Nov 16, 2022
ray_provider/operators/anyscale_cluster.py Show resolved Hide resolved
ray_provider/operators/anyscale_cluster.py Outdated Show resolved Hide resolved
ray_provider/operators/anyscale_cluster.py Outdated Show resolved Hide resolved
ray_provider/operators/anyscale_cluster.py Outdated Show resolved Hide resolved
ray_provider/operators/anyscale_cluster.py Outdated Show resolved Hide resolved
mlopezgez and others added 4 commits November 30, 2022 10:31
Co-authored-by: Rajath <92459020+rajaths010494@users.noreply.github.com>
Co-authored-by: Rajath <92459020+rajaths010494@users.noreply.github.com>
Co-authored-by: Rajath <92459020+rajaths010494@users.noreply.github.com>
Co-authored-by: Rajath <92459020+rajaths010494@users.noreply.github.com>
from anyscale.shared_anyscale_utils.utils.byod import BYODInfo
from anyscale.sdk.anyscale_client.models.create_production_job import CreateProductionJob

_POKE_INTERVAL = 60

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the user wanted to poll at a different interval maybe a lesser time or more?

if self.wait_for_completion:
while not AnyscaleSessionCommandSensor(
task_id="wait_session_command",
session_command_id=session_command_response.id,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not sending the goal_state, here it is complete state?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we are waiting for the session command to return a status_code. If it is not finished, this value is None. Then we compute the execution time and raise an exception if the session_command returned a different status_code than 0.

auth_token=self.auth_token,
).poke(context):

time.sleep(_POKE_INTERVAL)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add poke_interval as the class param, so that the user can modify it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely. I'll update de PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in other operators as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I updated the base class.

Comment on lines 80 to 81
self.ray_version = ray_version or "1.13.0"
self.python_version = python_version or "py38"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use these as default values also when getting these params.
WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. I will add these as the default value for python_version and ray_version.

Copy link

@rajaths010494 rajaths010494 Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also update in other files as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I updated all the operators.

Comment on lines 53 to 56

if goal_state is not None:
return False

Copy link

@rajaths010494 rajaths010494 Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we supposed to check for the current state and goal_state also?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to check current state and goal state?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the condition:

if goal_state is not None and goal_state != state:
    return False

This way, the sensor checks if the cluster has reached the goal_state.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider that the goal_state returned by the Anyscale API is None when the cluster has reached the goal state.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok got it

self,
*,
auth_token: str,
poke_interval: Optional[int] = 60,
Copy link

@rajaths010494 rajaths010494 Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

param for poke_interval is missing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docstring.

@rajaths010494
Copy link

Overall LGTM.

@mrrobby
Copy link
Collaborator

mrrobby commented Dec 8, 2022

Hey guys, just getting to seeing this. This looks really awesome. Would it be wise to change the input of an Auth Token to using a connection ID and a hook to be used in the BaseOperator? Also, can you provide some documentation on usage and how it deviates from running ray code directly in airflow through the provider? Is it appropriate for this to be a separate provider package?

@mlopezgez
Copy link
Author

Hey @mrrobby. Indeed we can create a Hook that uses a connection to Anyscale. My only observation is that it will only have the auth token because it is what the Anyscale Python SDK needs to operate with the Anyscale account.
On the other hand, I would be open to creating another package specific to this Anyscale provider.
Best.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants