Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up skypilot #273

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
63 changes: 62 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,68 @@ modal serve modal_server_vllm
```shell Python
modal deploy modal_server_vllm
```


## Quick Cloud Deployment

We use [Skypilot](https://skypilot.readthedocs.io/en/latest/) to deploy Functionary models onto various clouds. Currently, we support the following clouds:
- Lambdalabs
- RunPod

### Get Started

1. Install night version of Skypilot (we currently use the 2024-10-23 version):

```bash
pip install skypilot-nightly[all]==1.0.0.dev20241023
```

2. Set up your cloud credentials by following the instructions [here](https://skypilot.readthedocs.io/en/latest/getting-started/installation.html#cloud-account-setup)

### Inference

Use the `deploy_skypilot.py` script to deploy a Functionary model onto various clouds using Skypilot.

#### Usage

1. Run the following command to check the available arguments:
```bash
python deploy_skypilot.py --help
```

2. For Lambdalabs, please expose the port for the server manually first [here](https://cloud.lambdalabs.com/firewall) before running `deploy_skypilot.py`.

3. By default, `args.detach_run` is enabled. To stream the job logs, enter `sky logs <cluster_name>` If you want to run the command in the foreground, please set `args.detach_run` to `False`.

4. SkyPilot does not support stopping instances both Lambdalabs and RunPod currently. To terminate the cluster, run the following command:
```bash
sky down <cluster_name>
```

### Training

Use the `train_skypilot.py` script to train a Functionary model using Skypilot. It performs all the steps mentioned in the [Training](functionary/train/README.md) section. In addition, it automatically uploads the trained model to Hugging Face at the end of the training job.

#### Usage

1. Run the following command to check the available arguments:

```bash
python train_skypilot.py --help
```

2. `train_skypilot.py` accepts the same training command in [Training](functionary/train/README.md) section. Please write the training command into a shell script file, e.g., `train.sh`, and pass it to `train_skypilot.py` using the `--train-command-file` argument.

3. When using Skypilot, we will mount the data files to the cluster. Therefore, you should not specify the `train_data_path` and `eval_data_path` in the training command. Instead, you should specify the paths to the data files in the `train_skypilot.py` script using the `--train-data-path` and `--eval-data-path` arguments.

4. To successfully upload the trained model to Hugging Face post-training as well as log the training process to Weights & Biases, please provide your WandB and Hugging Face tokens to the `train_skypilot.py` script using the `--wandb-token` and `--hf-token` arguments. The HF repository will be in `args.hf_organization` and with the name stated in `output_dir` in the training command file.

**Example Command**

```bash
python train_skypilot.py --cluster-name train-cluster --method lora --cloud runpod --accelerators A100-80GB-SXM --num-accelerators 8 --train-command-file train.sh --train-data-path train_dataset.jsonl --eval-data-path eval_dataset.jsonl --wandb-token <WANDB_TOKEN> --hf-token <HUGGINGFACE_TOKEN> --hf-organization <HF_ORGANIZATION>
```


# Use Cases

Here are a few examples of how you can use this function calling system:
Expand Down
240 changes: 240 additions & 0 deletions deploy_skypilot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
import argparse
import logging

import sky

from functionary.skypilot_utils import (
CLOUD_MAPPING,
check_features,
form_setup,
get_cloud_provider,
)

# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def form_command() -> str:
"""
Form the command to run the vLLM server.

This function constructs the command string to start the vLLM server
based on the provided arguments. It includes the model, port, host,
and optional parameters like max_model_len and tensor_parallel_size.

Returns:
str: The formatted command string to run the vLLM server.
"""
if args.docker_image:
command = f"sudo docker run --gpus all --shm-size 1g {args.docker_image}"
else:
command = "cd functionary && "
if args.backend == "vllm":
command += f"python server_vllm.py"
else:
command += f"python server_sglang.py"

command += f" --model {args.model} --port {args.port} --host {args.host}"
if args.max_model_len is not None:
if args.backend == "vllm":
command += f" --max-model-len {args.max_model_len}"
else:
command += f" --context-length {args.max_model_len}"
if args.tensor_parallel_size is not None:
command += f" --tensor-parallel-size {args.tensor_parallel_size}"

return command


def main():
"""
Main function to deploy a Functionary model using Skypilot.

This function performs the following steps:
1. Retrieves the cloud provider based on the specified argument.
2. Checks the features supported by the cloud provider.
3. Creates a Skypilot Task with the necessary setup and run commands.
4. Sets the resources for the task, including cloud, accelerators, ports, and disk size.
5. Launches the task using Skypilot, with specified cluster name and optional timeout settings.

Side effects:
- Modifies global 'args' object based on cloud provider features.
- Launches a Skypilot task, which may create or modify cloud resources.

Raises:
Any exceptions raised by Skypilot during task creation or launch.
"""
cloud = get_cloud_provider(cloud_name=args.cloud)
check_features(cloud=cloud, args=args, logger=logger)

envs = {}

if args.docker_image:
envs["DOCKER_USERNAME"] = args.docker_username
envs["DOCKER_PASSWORD"] = args.docker_password
setup = f"docker login --username $DOCKER_USERNAME --password $DOCKER_PASSWORD"
else:
setup = form_setup(args=args)
if args.backend == "vllm":
setup += "pip install -e .[vllm]"
else:
setup += "pip install -e .[sglang] --find-links https://flashinfer.ai/whl/cu121/torch2.4/flashinfer/"

# Authenticate HF if token is provided
if args.hf_token:
envs["HF_TOKEN"] = args.hf_token
if args.docker_image is None:
setup += f" && huggingface-cli login --token $HF_TOKEN"

task = sky.Task(
setup=setup,
run=form_command(),
envs=envs,
workdir=None,
)

task.set_resources(
sky.Resources(
cloud=cloud,
accelerators=f"{args.accelerators}:{args.num_accelerators}",
ports=args.port_to_open,
disk_size=args.disk_size,
region=args.region,
)
)

sky.launch(
task,
cluster_name=args.cluster_name,
idle_minutes_to_autostop=args.idle_timeout,
down=args.down,
detach_run=args.detach_run,
)


def parse_args():
parser = argparse.ArgumentParser(description="Deploy Skypilot")
parser.add_argument(
"--cluster-name", type=str, required=True, help="Name of the cluster"
)
parser.add_argument(
"--docker-image",
type=str,
default=None,
help="Docker image to run. If None, setup and run commands will be used instead.",
)
parser.add_argument(
"--docker-username",
type=str,
default=None,
help="Docker username to use. Only used if docker-image is provided.",
)
parser.add_argument(
"--docker-password",
type=str,
default=None,
help="Docker password to use. Only used if docker-image is provided.",
)
parser.add_argument(
"--commit",
type=str,
default=None,
help="Provide a commit hash to deploy a specific version of Functionary. If None, the latest commit in the main branch will be deployed.",
)
parser.add_argument(
"--backend",
type=str,
choices=["vllm", "sglang"],
default="vllm",
help="Backend inference framework to use. (Currently either `vllm` or `sglang`)",
)
parser.add_argument(
"--cloud",
type=str,
default=None,
help=f"Cloud provider (default: None). Currently only supports {list(CLOUD_MAPPING.keys())}",
)
parser.add_argument(
"--accelerators",
type=str,
default="A100",
help="Accelerator type. Check available types with `sky show-gpus --all`",
)
parser.add_argument(
"--num-accelerators",
type=int,
default=1,
help="Number of accelerators. Check available values with `sky show-gpus --all`",
)
parser.add_argument(
"--disk-size",
type=str,
default=256,
help="The size of the OS disk in GiB. If None, defaults to 256 GiB",
)
parser.add_argument(
"--region", type=str, default=None, help="Region (default: None)"
)
parser.add_argument(
"--idle-timeout",
type=int,
default=-1,
help="Idle timeout in minutes. `-1` means no timeout",
)
parser.add_argument(
"--down",
type=bool,
default=False,
help="Whether to tear down the cluster when timeout",
)
parser.add_argument(
"--model",
type=str,
default="meetkai/functionary-small-v3.2",
help="Model to use",
)
parser.add_argument("--max-model-len", type=int, default=None, help="Model to use")
parser.add_argument(
"--tensor-parallel-size", type=int, default=1, help="Tensor parallel size"
)
parser.add_argument("--port", type=int, default=8000, help="Port to use")
parser.add_argument("--host", type=str, default="0.0.0.0", help="host to use")
parser.add_argument(
"--detach-run",
type=bool,
default=True,
help="Detach run upon job to run server is submitted.",
)
parser.add_argument(
"--hf-token",
type=str,
default=None,
help="Hugging Face token for downloading models. Only use this is the model is gated or private.",
)

args = parser.parse_args()

if args.docker_image:
if args.docker_username is None or args.docker_password is None:
raise ValueError(
"Docker username and password must be provided if docker-image is used."
)
if args.cloud == "runpod":
raise ValueError("Runpod does not support docker images.")

if args.disk_size is None:
args.disk_size = 256
args.disk_size = min(int(args.disk_size), 1024) # Set max disk size to 1TB
if args.idle_timeout == -1:
args.idle_timeout = None
args.port_to_open = args.port

return args


if __name__ == "__main__":
args = parse_args()
main()
82 changes: 82 additions & 0 deletions functionary/skypilot_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import argparse
import logging

import sky

CLOUD_MAPPING = {
"lambda": sky.Lambda(),
"runpod": sky.RunPod(),
}


def get_cloud_provider(cloud_name: str) -> sky.clouds.Cloud:
"""
Get the cloud provider object based on the given cloud name.

Args:
cloud_name (str): The name of the cloud provider.

Returns:
sky.clouds.Cloud: The corresponding cloud provider object.

Raises:
AssertionError: If an invalid cloud provider name is given.
"""
assert cloud_name.lower() in CLOUD_MAPPING, f"Invalid cloud provider: {cloud_name}"
return CLOUD_MAPPING[cloud_name.lower()]


def check_features(
cloud: sky.clouds.Cloud, args: argparse.Namespace, logger: logging.Logger
):
"""
Check if the cloud provider supports certain features and update arguments accordingly.

This function checks if the given cloud provider supports stopping instances and opening ports.
If these features are not supported, it updates the corresponding arguments and logs warnings.

Args:
cloud (sky.clouds.Cloud): The cloud provider object to check.
args (argparse.Namespace): The parsed command line arguments to update.
logger (logging.Logger): Logger instance for outputting warnings.

Side effects:
- Modifies args.idle_timeout and args.down if stopping is not supported
- Modifies args.port_to_open if opening ports is not supported
- Logs warnings for unsupported features
"""
unsupported_features = cloud._unsupported_features_for_resources(None)

if sky.clouds.CloudImplementationFeatures.STOP in unsupported_features:
logger.warning(
f"Stopping is not supported on {repr(cloud)}. Setting args.idle_timeout and args.down to None."
)
args.idle_timeout = None
args.down = None
if sky.clouds.CloudImplementationFeatures.OPEN_PORTS in unsupported_features:
logger.warning(
f"Opening port is not supported on {repr(cloud)}. Setting args.port_to_open to None. Please open port manually."
)
args.port_to_open = None


def form_setup(args: argparse.Namespace) -> str:
"""
Form the setup command string for initializing the environment.

This function constructs the setup command string that handles cloning the repository
and checking out a specific commit if specified.

Args:
args (argparse.Namespace): The parsed command line arguments containing:
- commit (str, optional): Git commit hash to checkout. If None, uses latest main branch.

Returns:
str: The formatted setup command string.
"""
setup = "if [ ! -d 'functionary' ]; then git clone https://github.com/meetkai/functionary.git && cd functionary"
if args.commit is not None:
setup += f" && git checkout {args.commit}"
setup += "; else cd functionary; fi && "

return setup
Loading
Loading