Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xgboost "Allreduce failed" with the dask operator specifically #898

Closed
droctothorpe opened this issue Jul 19, 2024 · 8 comments
Closed

xgboost "Allreduce failed" with the dask operator specifically #898

droctothorpe opened this issue Jul 19, 2024 · 8 comments
Labels

Comments

@droctothorpe
Copy link
Contributor

Hey, @jacobtomlinson!

We're trying to run a training job that uses xgboost + dask + distributed + dask-kubernetes. It works fine as long as provision the dask cluster with KubeCluster classic. As soon as we provision the cluster with KubeCluster operator, the same exact computation FAILS with the following error:

│ main     raise XGBoostError(py_str(_LIB.XGBGetLastError()))                                                                                                                                                                                                                  
│ main xgboost.core.XGBoostError: [18:22:16] ../rabit/include/rabit/internal/utils.h:86: Allreduce failed                                                                                                                                                                      
│ main time="2024-07-19T18:22:18.609Z" level=info msg="sub-process exited" argo=true error="<nil>"                                                                                                                                                                             
│ main Error: exit status 1

All other dependencies are identical! We're even using the same version of dask-kubernetes (2023.10.0).

I'm sorry for not providing MCVE. It would be really hard to extricate the replicate the proprietary dataset.

Have you ever run into anything like this before? Any advice would be greatly appreciated. Thank you!

Environment:

  • Dask version: 2023.10.0
  • Python version: 3.9
  • Operating System: linux
  • Install method (conda, pip, source): pip
@droctothorpe
Copy link
Contributor Author

So it turns out that this error can surface when the memory-limit flag is not passed to the worker command. The operator's make_cluster_spec doesn't set or explicitly expose this argument (link), whereas classic did (link) and even recommended setting it to the same value as requests (and vice versa) (link).

How would you feel about exposing it via make_cluster_spec? Would you prefer that it default to auto? Happy to contribute if you think this contribution makes sense (bandwidth permitting).

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Jul 23, 2024

With the operator the flag is not set, so it defaults to auto. It would be interesting to know what value that results in on your cluster. If you're setting your Pod memory resource limit then I would expect auto to correctly detect that limit and we shouldn't need to specify that. Could you check the worker info and find out what value it detects?

If I remember correctly we added setting things explicitly in the classic mode because in the early days Kubernetes/linux cgroups didn't always correctly report the right memory limit and would report the limit of the whole node. I wouldn't expect that to be an issue these days, but perhaps the problem you ran into here shows that it does.

I'm very conservative about adding new options to make_cluster_spec and KubeCluster because it is a slippery slope. If you want to set that memory limit flag with the operator you can do it like this:

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

cluster_spec = make_cluster_spec(name="foo")
cluster_spec["spec"]["worker"]["spec"]["containers"][0]["args"] += ["--memory-limit", "4GB"]

cluster = KubeCluster(custom_cluster_spec=cluster_spec)

I've been thinking lately about making the customizing your cluster API a little more pleasant, and I think that would be useful here. I'll open a separate issue for that.

@droctothorpe
Copy link
Contributor Author

Thanks, @jacobtomlinson!

Could you check the worker info and find out what value it detects?

Dumb question: how do I check what "auto" evaluates to?

@jacobtomlinson
Copy link
Member

You should be able to see the worker memory on the dashboard.

<your dashboard url>/info/main/workers.html

@droctothorpe
Copy link
Contributor Author

Looks like it was inferred to a reasonable value and setting it manually was a red herring. Thanks, @jacobtomlinson!

@jacobtomlinson
Copy link
Member

So did you find what was causing the Allreduce failed error?

@droctothorpe
Copy link
Contributor Author

Turns out it was cluster autoscaler trying to bin-pack workers and the specific xgboost computation (RFE) not being resilient to worker loss.

Interestingly, we didn't see this behavior in classic when conducting the same computation. Take this with a huge grain of salt, but I'm wondering if it has something to do with these annotations. Were they excluded from the operator spec for any specific reason?

In any case, adding spec["metadata"]["annotations"]["cluster-autoscaler.kubernetes.io/safe-to-evict"] = "false" appears to have resolved the problem. We're adding it to the DaskClusterConfig class I mentioned as a no_evict property and setting it to True by default. Would it make sense a sensible default for KubeCluster as well? Also spec["metadata"]["annotations"]["karpenter.sh/do-not-evict"] == "true" for Karpenter (it's funny that Karpenter inverts the boolean).

@jacobtomlinson
Copy link
Member

Were they excluded from the operator spec for any specific reason?

Those tolerations are only there so that users can set up dedicated Dask nodes by adding the corresponding taint. Features like this were added by users who needed specific functionality. When we wrote the operator we intentionally left out a load of things like this because it's much easier to just add them yourself now and it helps keep the package more simple.

Would it make sense a sensible default for KubeCluster as well?

Generally Dask workers should be safe to evict. When they recieve a SIGTERM they attempt to hand off any tasks and memory to another worker before shutting down. I think it's actually XGBoost that isn't resillient to this, so I don't think we should add a default just for that single use case.

I think this definitely feeds into #899. Adding a concenience method to make it easy to add annotations would be very helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants