-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Add 1s timeout in RPC to CoreWorkerService.NumPendingTasks in GcsJobManager::HandleGetAllJobInfo #46335
Conversation
…anager::HandleGetAllJobInfo Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Note: we probably also want to add timeouts to all other CoreWorkerInterface APIs, later some day. |
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
src/ray/protobuf/gcs.proto
Outdated
@@ -703,6 +703,7 @@ message JobTableData { | |||
// The optional JobInfo from the Ray Job API. | |||
optional JobsAPIInfo job_info = 10; | |||
// Whether this job has running tasks. | |||
// In GetAllJobInfo, if GCS can't reach the driver, it will set this field to true. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually after a second thought, can we do
optional bool is_running_tasks
and don't set it for timeout. I think this is more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be a breaking change (required -> optional). but ok since nobody other than humans are reading it. will do
@shomilj can you take a look approve this PR? it's adding a 1s timeout when GCS asks each driver core worker about is_running_tasks. If it timed out, GCS returns (not set) in GetAllJobInfo. |
Defer to @lanbochen-anyscale |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's unset, what will http server returns to the caller?
Also can we add some tests?
.WithField(kLogKeyJobID, job_id) | ||
.WithField(kLogKeyWorkerID, worker_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com> Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
it returns a json that simply does not have such field. See the test. ray/python/ray/tests/test_state_api.py Line 3653 in 4a7c107
added test. |
@@ -703,7 +703,8 @@ message JobTableData { | |||
// The optional JobInfo from the Ray Job API. | |||
optional JobsAPIInfo job_info = 10; | |||
// Whether this job has running tasks. | |||
bool is_running_tasks = 11; | |||
// In GetAllJobInfo, if GCS can't reach the driver, it will be unset. | |||
optional bool is_running_tasks = 11; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we add optional
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when GCS handles JobInfoGcsService.GetAllJobInfo
it sends 1 RPC to each driver for each job's number of running tasks, to determine this is_running_tasks
. If the RPC times out (1s), GCS has no idea if it's running or not (the job may be hang, unresponsive, connection issues and whatsoever). We want GCS to share this knowledge to the caller and hence make it optional.
python/ray/tests/test_state_api.py
Outdated
driver_script = f""" | ||
import ray | ||
import time | ||
|
||
ray.init(namespace="test_hang_driver_has_no_is_running_task", address="{address}") | ||
# Now, it's long running... | ||
@ray.remote | ||
def long_running(): | ||
signal_actor = ray.get_actor("signal") | ||
ray.get(signal_actor.send.remote()) | ||
print("begin infinite sleeping...") | ||
while True: | ||
time.sleep(10000) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a simpler way to do this. Check out RAY_testing_asio_delay_us
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, we want it to "keep running and stuck". If we set RAY_testing_asio_delay_us to inf it won't be able to invoke signal_actor.send.remote()
in the first place. What about we keep it this way?
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
can merge? |
might be breaking this windows test https://buildkite.com/ray-project/postmerge/builds/5383#0190a249-1219-43be-ac6c-9d07da92f27c/11626-12022, i'm confirming |
…GcsJobManager::HandleGetAllJobInfo (ray-project#46335) Signed-off-by: Ruiyang Wang <rywang014@gmail.com> Signed-off-by: hejialing.hjl <hejialing.hjl@bytedance.com>
Critical Dashboard API
GET /api/jobs
sends RPC toJobInfoGcsService.GetAllJobInfo
, where the GCS sends RPC to each toCoreWorkerService.NumPendingTasks
for the info of "how many running tasks do ya have rn?". This is not mission critical - in Ray and in Product nobody reads that field, other than in tests. But the Dashboard API itself is mission critical, so we set 1s timeout in the inner RPC, and if it times out or failed, we just set is_running_tasks to None.