Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Task level config #3689

Merged
merged 24 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 8 additions & 25 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,23 +874,8 @@ def write_cluster_config(
f'open(os.path.expanduser("{constants.SKY_REMOTE_RAY_PORT_FILE}"), "w", encoding="utf-8"))\''
)

# Docker run options
docker_run_options = skypilot_config.get_nested(('docker', 'run_options'),
[])
if isinstance(docker_run_options, str):
docker_run_options = [docker_run_options]
if docker_run_options and isinstance(to_provision.cloud, clouds.Kubernetes):
logger.warning(f'{colorama.Style.DIM}Docker run options are specified, '
'but ignored for Kubernetes: '
f'{" ".join(docker_run_options)}'
f'{colorama.Style.RESET_ALL}')

# Use a tmp file path to avoid incomplete YAML file being re-used in the
# future.
initial_setup_commands = []
if (skypilot_config.get_nested(('nvidia_gpus', 'disable_ecc'), False) and
to_provision.accelerators is not None):
initial_setup_commands.append(constants.DISABLE_GPU_ECC_COMMAND)
tmp_yaml_path = yaml_path + '.tmp'
common_utils.fill_template(
cluster_config_template,
Expand Down Expand Up @@ -922,8 +907,6 @@ def write_cluster_config(
# currently only used by GCP.
'specific_reservations': specific_reservations,

# Initial setup commands.
'initial_setup_commands': initial_setup_commands,
# Conda setup
'conda_installation_commands':
constants.CONDA_INSTALLATION_COMMANDS,
Expand All @@ -935,9 +918,6 @@ def write_cluster_config(
wheel_hash).replace('{cloud}',
str(cloud).lower())),

# Docker
'docker_run_options': docker_run_options,

# Port of Ray (GCS server).
# Ray's default port 6379 is conflicted with Redis.
'ray_port': constants.SKY_REMOTE_RAY_PORT,
Expand Down Expand Up @@ -976,17 +956,20 @@ def write_cluster_config(
output_path=tmp_yaml_path)
config_dict['cluster_name'] = cluster_name
config_dict['ray'] = yaml_path

# Add kubernetes config fields from ~/.sky/config
if isinstance(cloud, clouds.Kubernetes):
kubernetes_utils.combine_pod_config_fields(
tmp_yaml_path,
cluster_config_overrides=to_provision.cluster_config_overrides)
kubernetes_utils.combine_metadata_fields(tmp_yaml_path)

if dryrun:
# If dryrun, return the unfinished tmp yaml path.
config_dict['ray'] = tmp_yaml_path
return config_dict
_add_auth_to_cluster_config(cloud, tmp_yaml_path)

# Add kubernetes config fields from ~/.sky/config
if isinstance(cloud, clouds.Kubernetes):
kubernetes_utils.combine_pod_config_fields(tmp_yaml_path)
kubernetes_utils.combine_metadata_fields(tmp_yaml_path)

# Restore the old yaml content for backward compatibility.
if os.path.exists(yaml_path) and keep_launch_fields_in_existing_config:
with open(yaml_path, 'r', encoding='utf-8') as f:
Expand Down
4 changes: 2 additions & 2 deletions sky/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def get_all_clouds():
# Use allowed_clouds from config if it exists, otherwise check all clouds.
# Also validate names with get_cloud_tuple.
config_allowed_cloud_names = [
get_cloud_tuple(c)[0] for c in skypilot_config.get_nested(
['allowed_clouds'], get_all_clouds())
get_cloud_tuple(c)[0] for c in skypilot_config.get_nested((
'allowed_clouds',), get_all_clouds())
]
# Use disallowed_cloud_names for logging the clouds that will be disabled
# because they are not included in allowed_clouds in config.yaml.
Expand Down
10 changes: 7 additions & 3 deletions sky/clouds/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,10 @@ def _unsupported_features_for_resources(
# because `skypilot_config` may change for an existing cluster.
# Clusters created with MIG (only GPU clusters) cannot be stopped.
if (skypilot_config.get_nested(
('gcp', 'managed_instance_group'), None) is not None and
resources.accelerators):
('gcp', 'managed_instance_group'),
None,
override_configs=resources.cluster_config_overrides) is not None
and resources.accelerators):
unsupported[clouds.CloudImplementationFeatures.STOP] = (
'Managed Instance Group (MIG) does not support stopping yet.')
unsupported[clouds.CloudImplementationFeatures.SPOT_INSTANCE] = (
Expand Down Expand Up @@ -506,7 +508,9 @@ def make_deploy_resources_variables(
resources_vars['tpu_node_name'] = tpu_node_name

managed_instance_group_config = skypilot_config.get_nested(
('gcp', 'managed_instance_group'), None)
('gcp', 'managed_instance_group'),
None,
override_configs=resources.cluster_config_overrides)
use_mig = managed_instance_group_config is not None
resources_vars['gcp_use_managed_instance_group'] = use_mig
# Convert boolean to 0 or 1 in string, as GCP does not support boolean
Expand Down
22 changes: 12 additions & 10 deletions sky/clouds/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ class Kubernetes(clouds.Cloud):

SKY_SSH_KEY_SECRET_NAME = 'sky-ssh-keys'
SKY_SSH_JUMP_NAME = 'sky-ssh-jump-pod'
# Timeout for resource provisioning. This timeout determines how long to
# wait for pod to be in pending status before giving up.
# Larger timeout may be required for autoscaling clusters, since autoscaler
# may take some time to provision new nodes.
# Note that this timeout includes time taken by the Kubernetes scheduler
# itself, which can be upto 2-3 seconds.
# For non-autoscaling clusters, we conservatively set this to 10s.
timeout = skypilot_config.get_nested(['kubernetes', 'provision_timeout'],
10)

# Limit the length of the cluster name to avoid exceeding the limit of 63
# characters for Kubernetes resources. We limit to 42 characters (63-21) to
Expand Down Expand Up @@ -309,14 +300,25 @@ def make_deploy_resources_variables(
if resources.use_spot:
spot_label_key, spot_label_value = kubernetes_utils.get_spot_label()

# Timeout for resource provisioning. This timeout determines how long to
# wait for pod to be in pending status before giving up.
# Larger timeout may be required for autoscaling clusters, since
# autoscaler may take some time to provision new nodes.
# Note that this timeout includes time taken by the Kubernetes scheduler
# itself, which can be upto 2-3 seconds.
# For non-autoscaling clusters, we conservatively set this to 10s.
timeout = skypilot_config.get_nested(
('kubernetes', 'provision_timeout'),
10,
override_configs=resources.cluster_config_overrides)
deploy_vars = {
'instance_type': resources.instance_type,
'custom_resources': custom_resources,
'region': region.name,
'cpus': str(cpus),
'memory': str(mem),
'accelerator_count': str(acc_count),
'timeout': str(self.timeout),
'timeout': str(timeout),
'k8s_namespace':
kubernetes_utils.get_current_kube_config_context_namespace(),
'k8s_port_mode': port_mode.value,
Expand Down
22 changes: 16 additions & 6 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,9 +1362,10 @@ def merge_dicts(source: Dict[Any, Any], destination: Dict[Any, Any]):
elif isinstance(value, list) and key in destination:
assert isinstance(destination[key], list), \
f'Expected {key} to be a list, found {destination[key]}'
if key == 'containers':
# If the key is 'containers', we take the first and only
# container in the list and merge it.
if key in ['containers', 'imagePullSecrets']:
# If the key is 'containers' or 'imagePullSecrets, we take the
# first and only container/secret in the list and merge it, as
# we only support one container per pod.
assert len(value) == 1, \
f'Expected only one container, found {value}'
merge_dicts(value[0], destination[key][0])
Expand All @@ -1387,7 +1388,10 @@ def merge_dicts(source: Dict[Any, Any], destination: Dict[Any, Any]):
destination[key] = value


def combine_pod_config_fields(cluster_yaml_path: str) -> None:
def combine_pod_config_fields(
cluster_yaml_path: str,
cluster_config_overrides: Dict[str, Any],
) -> None:
"""Adds or updates fields in the YAML with fields from the ~/.sky/config's
kubernetes.pod_spec dict.
This can be used to add fields to the YAML that are not supported by
Expand Down Expand Up @@ -1429,8 +1433,14 @@ def combine_pod_config_fields(cluster_yaml_path: str) -> None:
with open(cluster_yaml_path, 'r', encoding='utf-8') as f:
yaml_content = f.read()
yaml_obj = yaml.safe_load(yaml_content)
# We don't use override_configs in `skypilot_config.get_nested`, as merging
# the pod config requires special handling.
kubernetes_config = skypilot_config.get_nested(('kubernetes', 'pod_config'),
{})
default_value={},
override_configs={})
override_pod_config = (cluster_config_overrides.get('kubernetes', {}).get(
'pod_config', {}))
merge_dicts(override_pod_config, kubernetes_config)

# Merge the kubernetes config into the YAML for both head and worker nodes.
merge_dicts(
Expand Down Expand Up @@ -1554,7 +1564,7 @@ def get_head_pod_name(cluster_name_on_cloud: str):
def get_autoscaler_type(
) -> Optional[kubernetes_enums.KubernetesAutoscalerType]:
"""Returns the autoscaler type by reading from config"""
autoscaler_type = skypilot_config.get_nested(['kubernetes', 'autoscaler'],
autoscaler_type = skypilot_config.get_nested(('kubernetes', 'autoscaler'),
None)
if autoscaler_type is not None:
autoscaler_type = kubernetes_enums.KubernetesAutoscalerType(
Expand Down
53 changes: 50 additions & 3 deletions sky/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Resources:
"""
# If any fields changed, increment the version. For backward compatibility,
# modify the __setstate__ method to handle the old version.
_VERSION = 18
_VERSION = 19

def __init__(
self,
Expand All @@ -68,6 +68,7 @@ def __init__(
_docker_login_config: Optional[docker_utils.DockerLoginConfig] = None,
_is_image_managed: Optional[bool] = None,
_requires_fuse: Optional[bool] = None,
_cluster_config_overrides: Optional[Dict[str, Any]] = None,
):
"""Initialize a Resources object.

Expand Down Expand Up @@ -218,6 +219,8 @@ def __init__(

self._requires_fuse = _requires_fuse

self._cluster_config_overrides = _cluster_config_overrides

self._set_cpus(cpus)
self._set_memory(memory)
self._set_accelerators(accelerators, accelerator_args)
Expand Down Expand Up @@ -448,6 +451,12 @@ def requires_fuse(self) -> bool:
return False
return self._requires_fuse

@property
def cluster_config_overrides(self) -> Dict[str, Any]:
if self._cluster_config_overrides is None:
return {}
return self._cluster_config_overrides

@requires_fuse.setter
def requires_fuse(self, value: Optional[bool]) -> None:
self._requires_fuse = value
Expand Down Expand Up @@ -1011,13 +1020,39 @@ def make_deploy_variables(self, cluster_name_on_cloud: str,
cloud.make_deploy_resources_variables() method, and the cloud-agnostic
variables are generated by this method.
"""
# Initial setup commands
initial_setup_commands = []
if (skypilot_config.get_nested(
('nvidia_gpus', 'disable_ecc'),
False,
override_configs=self.cluster_config_overrides) and
self.accelerators is not None):
initial_setup_commands = [constants.DISABLE_GPU_ECC_COMMAND]

# Docker run options
docker_run_options = skypilot_config.get_nested(
('docker', 'run_options'),
default_value=[],
override_configs=self.cluster_config_overrides)
if isinstance(docker_run_options, str):
docker_run_options = [docker_run_options]
if docker_run_options and isinstance(self.cloud, clouds.Kubernetes):
logger.warning(
f'{colorama.Style.DIM}Docker run options are specified, '
'but ignored for Kubernetes: '
f'{" ".join(docker_run_options)}'
f'{colorama.Style.RESET_ALL}')

docker_image = self.extract_docker_image()

# Cloud specific variables
cloud_specific_variables = self.cloud.make_deploy_resources_variables(
self, cluster_name_on_cloud, region, zones, dryrun)
docker_image = self.extract_docker_image()
return dict(
cloud_specific_variables,
**{
# Docker config
'docker_run_options': docker_run_options,
# Docker image. The image name used to pull the image, e.g.
# ubuntu:latest.
'docker_image': docker_image,
Expand All @@ -1027,7 +1062,9 @@ def make_deploy_variables(self, cluster_name_on_cloud: str,
constants.DEFAULT_DOCKER_CONTAINER_NAME,
# Docker login config (if any). This helps pull the image from
# private registries.
'docker_login_config': self._docker_login_config
'docker_login_config': self._docker_login_config,
# Initial setup commands.
'initial_setup_commands': initial_setup_commands,
})

def get_reservations_available_resources(self) -> Dict[str, int]:
Expand Down Expand Up @@ -1208,6 +1245,8 @@ def copy(self, **override) -> 'Resources':
_is_image_managed=override.pop('_is_image_managed',
self._is_image_managed),
_requires_fuse=override.pop('_requires_fuse', self._requires_fuse),
_cluster_config_overrides=override.pop(
'_cluster_config_overrides', self._cluster_config_overrides),
)
assert len(override) == 0
return resources
Expand Down Expand Up @@ -1367,6 +1406,8 @@ def _from_yaml_config_single(cls, config: Dict[str, str]) -> 'Resources':
resources_fields['_is_image_managed'] = config.pop(
'_is_image_managed', None)
resources_fields['_requires_fuse'] = config.pop('_requires_fuse', None)
resources_fields['_cluster_config_overrides'] = config.pop(
'_cluster_config_overrides', None)

if resources_fields['cpus'] is not None:
resources_fields['cpus'] = str(resources_fields['cpus'])
Expand Down Expand Up @@ -1410,6 +1451,8 @@ def add_if_not_none(key, value):
if self._docker_login_config is not None:
config['_docker_login_config'] = dataclasses.asdict(
self._docker_login_config)
add_if_not_none('_cluster_config_overrides',
self._cluster_config_overrides)
if self._is_image_managed is not None:
config['_is_image_managed'] = self._is_image_managed
if self._requires_fuse is not None:
Expand Down Expand Up @@ -1525,4 +1568,8 @@ def __setstate__(self, state):
if version < 18:
self._job_recovery = state.pop('_spot_recovery', None)

if version < 19:
self._cluster_config_overrides = state.pop(
'_cluster_config_overrides', None)

self.__dict__.update(state)
12 changes: 12 additions & 0 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Constants for SkyPilot."""
from typing import List, Tuple

from packaging import version

import sky
Expand Down Expand Up @@ -261,3 +263,13 @@
# Placeholder for the SSH user in proxy command, replaced when the ssh_user is
# known after provisioning.
SKY_SSH_USER_PLACEHOLDER = 'skypilot:ssh_user'

# The keys that can be overridden in the `~/.sky/config.yaml` file. The
# overrides are specified in task YAMLs.
OVERRIDEABLE_CONFIG_KEYS: List[Tuple[str, ...]] = [
('docker', 'run_options'),
('nvidia_gpus', 'disable_ecc'),
('kubernetes', 'pod_config'),
('kubernetes', 'provision_timeout'),
('gcp', 'managed_instance_group'),
]
Loading
Loading