From 9c6afb4fab5772347d34a6c9bb15bc1736161365 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sat, 24 Aug 2024 04:38:22 +0000 Subject: [PATCH 01/21] add autodown annotations to the k8s pod --- sky/backends/backend.py | 9 +++++++-- sky/backends/backend_utils.py | 9 +++++++-- sky/backends/cloud_vm_ray_backend.py | 18 ++++++++++++++---- sky/backends/local_docker_backend.py | 6 ++++-- sky/core.py | 4 +++- sky/execution.py | 2 ++ sky/templates/kubernetes-ray.yml.j2 | 8 ++++++-- 7 files changed, 43 insertions(+), 13 deletions(-) diff --git a/sky/backends/backend.py b/sky/backends/backend.py index 10389cf691e..69083b9742b 100644 --- a/sky/backends/backend.py +++ b/sky/backends/backend.py @@ -48,6 +48,8 @@ def provision( to_provision: Optional['resources.Resources'], dryrun: bool, stream_logs: bool, + down: bool = False, + idle_minutes_to_autostop: Optional[int] = None, cluster_name: Optional[str] = None, retry_until_up: bool = False) -> Optional[_ResourceHandleType]: if cluster_name is None: @@ -55,7 +57,8 @@ def provision( usage_lib.record_cluster_name_for_current_operation(cluster_name) usage_lib.messages.usage.update_actual_task(task) return self._provision(task, to_provision, dryrun, stream_logs, - cluster_name, retry_until_up) + cluster_name, down, idle_minutes_to_autostop, + retry_until_up) @timeline.event @usage_lib.messages.usage.update_runtime('sync_workdir') @@ -127,7 +130,9 @@ def _provision( dryrun: bool, stream_logs: bool, cluster_name: str, - retry_until_up: bool = False) -> Optional[_ResourceHandleType]: + retry_until_up: bool = False, + down: bool = False, + idle_minutes_to_autostop: Optional[int] = None) -> Optional[_ResourceHandleType]: raise NotImplementedError def _sync_workdir(self, handle: _ResourceHandleType, workdir: Path) -> None: diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 9986f93275a..f1d383f9b51 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -758,7 +758,9 @@ def write_cluster_config( region: clouds.Region, zones: Optional[List[clouds.Zone]] = None, dryrun: bool = False, - keep_launch_fields_in_existing_config: bool = True) -> Dict[str, str]: + keep_launch_fields_in_existing_config: bool = True, + down: bool = False, + idle_minutes_to_autostop: Optional[int] = None,) -> Dict[str, str]: """Fills in cluster configuration templates and writes them out. Returns: {provisioner: path to yaml, the provisioning spec}. @@ -914,7 +916,10 @@ def write_cluster_config( # The reservation pools that specified by the user. This is # currently only used by GCP. 'specific_reservations': specific_reservations, - + # These two are currently only used by Kubernetes to create + # annotations for the pod. + 'down': down, + 'idle_minutes_to_autostop' : idle_minutes_to_autostop, # Conda setup 'conda_installation_commands': constants.CONDA_INSTALLATION_COMMANDS, diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 9545436f05c..19315a38b48 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1300,6 +1300,8 @@ def _retry_zones( prev_cluster_status: Optional[status_lib.ClusterStatus], prev_handle: Optional['CloudVmRayResourceHandle'], prev_cluster_ever_up: bool, + down: bool = False, + idle_minutes_to_autostop: Optional[int] = None, ) -> Dict[str, Any]: """The provision retry loop.""" style = colorama.Style @@ -1395,7 +1397,9 @@ def _retry_zones( region=region, zones=zones, dryrun=dryrun, - keep_launch_fields_in_existing_config=cluster_exists) + keep_launch_fields_in_existing_config=cluster_exists, + down=down, + idle_minutes_to_autostop=idle_minutes_to_autostop) except exceptions.ResourcesUnavailableError as e: # Failed due to catalog issue, e.g. image not found, or # GPUs are requested in a Kubernetes cluster but the cluster @@ -1914,6 +1918,8 @@ def provision_with_retries( to_provision_config: ToProvisionConfig, dryrun: bool, stream_logs: bool, + down: bool = False, + idle_minutes_to_autostop: Optional[int] = None, ) -> Dict[str, Any]: """Provision with retries for all launchable resources.""" cluster_name = to_provision_config.cluster_name @@ -1972,7 +1978,9 @@ def provision_with_retries( cloud_user_identity=cloud_user, prev_cluster_status=prev_cluster_status, prev_handle=prev_handle, - prev_cluster_ever_up=prev_cluster_ever_up) + prev_cluster_ever_up=prev_cluster_ever_up, + down=down, + idle_minutes_to_autostop=idle_minutes_to_autostop) if dryrun: return config_dict except (exceptions.InvalidClusterNameError, @@ -2685,7 +2693,9 @@ def _provision( dryrun: bool, stream_logs: bool, cluster_name: str, - retry_until_up: bool = False) -> Optional[CloudVmRayResourceHandle]: + retry_until_up: bool = False, + down: bool = False, + idle_minutes_to_autostop: Optional[int] = None) -> Optional[CloudVmRayResourceHandle]: """Provisions using 'ray up'. Raises: @@ -2768,7 +2778,7 @@ def _provision( wheel_hash, blocked_resources=task.blocked_resources) config_dict = retry_provisioner.provision_with_retries( - task, to_provision_config, dryrun, stream_logs) + task, to_provision_config, dryrun, stream_logs, down, idle_minutes_to_autostop) break except exceptions.ResourcesUnavailableError as e: # Do not remove the stopped cluster from the global state diff --git a/sky/backends/local_docker_backend.py b/sky/backends/local_docker_backend.py index 78619943e8c..f55dd142b43 100644 --- a/sky/backends/local_docker_backend.py +++ b/sky/backends/local_docker_backend.py @@ -136,14 +136,16 @@ def _provision( dryrun: bool, stream_logs: bool, cluster_name: str, - retry_until_up: bool = False + retry_until_up: bool = False, + down: bool = False, + idle_minutes_to_autostop: Optional[int] = None ) -> Optional[LocalDockerResourceHandle]: """Builds docker image for the task and returns cluster name as handle. Since resource demands are ignored, There's no provisioning in local docker. """ - del to_provision # Unused + del to_provision, down, idle_minutes_to_autostop # Unused assert task.name is not None, ('Task name cannot be None - have you ' 'specified a task name?') if dryrun: diff --git a/sky/core.py b/sky/core.py index 85f81ac6c7a..cb6b4e8cd6d 100644 --- a/sky/core.py +++ b/sky/core.py @@ -229,7 +229,9 @@ def _start( dryrun=False, stream_logs=True, cluster_name=cluster_name, - retry_until_up=retry_until_up) + retry_until_up=retry_until_up, + down=down, + idle_minutes_to_autostop=idle_minutes_to_autostop) storage_mounts = backend.get_storage_mounts_metadata(handle.cluster_name) # Passing all_file_mounts as None ensures the local source set in Storage # to not redundantly sync source to the bucket. diff --git a/sky/execution.py b/sky/execution.py index 1f6bd09f9c3..985eccd5314 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -273,6 +273,8 @@ def _execute( handle = backend.provision(task, task.best_resources, dryrun=dryrun, + down=down, + idle_minutes_to_autostop=idle_minutes_to_autostop, stream_logs=stream_logs, cluster_name=cluster_name, retry_until_up=retry_until_up) diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index bd4bafd43d5..5e623d3e9b1 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -266,11 +266,15 @@ available_node_types: {%- for label_key, label_value in labels.items() %} {{ label_key }}: {{ label_value|tojson }} {%- endfor %} - {% if k8s_fuse_device_required %} annotations: + skypilot.co/autodown: "{{ down }}" + {% if idle_minutes_to_autostop %} + skypilot.co/idle_minutes_to_autostop: "{{ idle_minutes_to_autostop }}" + {% endif %} + {% if k8s_fuse_device_required %} # Required for FUSE mounting to access /dev/fuse container.apparmor.security.beta.kubernetes.io/ray-node: unconfined - {% endif %} + {% endif %} spec: # serviceAccountName: skypilot-service-account serviceAccountName: {{k8s_service_account_name}} From 2ff9e4d95bc9aa37044d388a657278835e39051c Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sat, 24 Aug 2024 23:07:45 +0000 Subject: [PATCH 02/21] revert kubernetes ray template --- sky/templates/kubernetes-ray.yml.j2 | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index 5e623d3e9b1..bd4bafd43d5 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -266,15 +266,11 @@ available_node_types: {%- for label_key, label_value in labels.items() %} {{ label_key }}: {{ label_value|tojson }} {%- endfor %} + {% if k8s_fuse_device_required %} annotations: - skypilot.co/autodown: "{{ down }}" - {% if idle_minutes_to_autostop %} - skypilot.co/idle_minutes_to_autostop: "{{ idle_minutes_to_autostop }}" - {% endif %} - {% if k8s_fuse_device_required %} # Required for FUSE mounting to access /dev/fuse container.apparmor.security.beta.kubernetes.io/ray-node: unconfined - {% endif %} + {% endif %} spec: # serviceAccountName: skypilot-service-account serviceAccountName: {{k8s_service_account_name}} From 4da096b44045bbef09bbb0662fb3675227a79983 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sat, 24 Aug 2024 23:09:16 +0000 Subject: [PATCH 03/21] revert backend_utils from invasive approach --- sky/backends/backend_utils.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index f1d383f9b51..280cf153847 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -758,9 +758,7 @@ def write_cluster_config( region: clouds.Region, zones: Optional[List[clouds.Zone]] = None, dryrun: bool = False, - keep_launch_fields_in_existing_config: bool = True, - down: bool = False, - idle_minutes_to_autostop: Optional[int] = None,) -> Dict[str, str]: + keep_launch_fields_in_existing_config: bool = True) -> Dict[str, str]: """Fills in cluster configuration templates and writes them out. Returns: {provisioner: path to yaml, the provisioning spec}. @@ -916,10 +914,6 @@ def write_cluster_config( # The reservation pools that specified by the user. This is # currently only used by GCP. 'specific_reservations': specific_reservations, - # These two are currently only used by Kubernetes to create - # annotations for the pod. - 'down': down, - 'idle_minutes_to_autostop' : idle_minutes_to_autostop, # Conda setup 'conda_installation_commands': constants.CONDA_INSTALLATION_COMMANDS, From 0e75c063344a31f3d7160ed24d8884ca98b6c8a7 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sat, 24 Aug 2024 23:09:54 +0000 Subject: [PATCH 04/21] nit --- sky/backends/backend_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 280cf153847..9986f93275a 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -914,6 +914,7 @@ def write_cluster_config( # The reservation pools that specified by the user. This is # currently only used by GCP. 'specific_reservations': specific_reservations, + # Conda setup 'conda_installation_commands': constants.CONDA_INSTALLATION_COMMANDS, From 54c764cc493959e3299b119a34000a078cd0cbc6 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sat, 24 Aug 2024 23:13:01 +0000 Subject: [PATCH 05/21] revert from invasive approaches --- sky/backends/backend.py | 5 +---- sky/backends/local_docker_backend.py | 6 ++---- sky/core.py | 4 +--- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/sky/backends/backend.py b/sky/backends/backend.py index 69083b9742b..99f6485a47a 100644 --- a/sky/backends/backend.py +++ b/sky/backends/backend.py @@ -48,8 +48,6 @@ def provision( to_provision: Optional['resources.Resources'], dryrun: bool, stream_logs: bool, - down: bool = False, - idle_minutes_to_autostop: Optional[int] = None, cluster_name: Optional[str] = None, retry_until_up: bool = False) -> Optional[_ResourceHandleType]: if cluster_name is None: @@ -57,8 +55,7 @@ def provision( usage_lib.record_cluster_name_for_current_operation(cluster_name) usage_lib.messages.usage.update_actual_task(task) return self._provision(task, to_provision, dryrun, stream_logs, - cluster_name, down, idle_minutes_to_autostop, - retry_until_up) + cluster_name, retry_until_up) @timeline.event @usage_lib.messages.usage.update_runtime('sync_workdir') diff --git a/sky/backends/local_docker_backend.py b/sky/backends/local_docker_backend.py index f55dd142b43..78619943e8c 100644 --- a/sky/backends/local_docker_backend.py +++ b/sky/backends/local_docker_backend.py @@ -136,16 +136,14 @@ def _provision( dryrun: bool, stream_logs: bool, cluster_name: str, - retry_until_up: bool = False, - down: bool = False, - idle_minutes_to_autostop: Optional[int] = None + retry_until_up: bool = False ) -> Optional[LocalDockerResourceHandle]: """Builds docker image for the task and returns cluster name as handle. Since resource demands are ignored, There's no provisioning in local docker. """ - del to_provision, down, idle_minutes_to_autostop # Unused + del to_provision # Unused assert task.name is not None, ('Task name cannot be None - have you ' 'specified a task name?') if dryrun: diff --git a/sky/core.py b/sky/core.py index cb6b4e8cd6d..85f81ac6c7a 100644 --- a/sky/core.py +++ b/sky/core.py @@ -229,9 +229,7 @@ def _start( dryrun=False, stream_logs=True, cluster_name=cluster_name, - retry_until_up=retry_until_up, - down=down, - idle_minutes_to_autostop=idle_minutes_to_autostop) + retry_until_up=retry_until_up) storage_mounts = backend.get_storage_mounts_metadata(handle.cluster_name) # Passing all_file_mounts as None ensures the local source set in Storage # to not redundantly sync source to the bucket. From 8d740ab51f8fab4ade9cf2507892806bb6a2ed24 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sat, 24 Aug 2024 23:14:29 +0000 Subject: [PATCH 06/21] revert --- sky/backends/backend.py | 4 +--- sky/execution.py | 2 -- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/sky/backends/backend.py b/sky/backends/backend.py index 99f6485a47a..10389cf691e 100644 --- a/sky/backends/backend.py +++ b/sky/backends/backend.py @@ -127,9 +127,7 @@ def _provision( dryrun: bool, stream_logs: bool, cluster_name: str, - retry_until_up: bool = False, - down: bool = False, - idle_minutes_to_autostop: Optional[int] = None) -> Optional[_ResourceHandleType]: + retry_until_up: bool = False) -> Optional[_ResourceHandleType]: raise NotImplementedError def _sync_workdir(self, handle: _ResourceHandleType, workdir: Path) -> None: diff --git a/sky/execution.py b/sky/execution.py index 985eccd5314..1f6bd09f9c3 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -273,8 +273,6 @@ def _execute( handle = backend.provision(task, task.best_resources, dryrun=dryrun, - down=down, - idle_minutes_to_autostop=idle_minutes_to_autostop, stream_logs=stream_logs, cluster_name=cluster_name, retry_until_up=retry_until_up) From 0af73d8ad2a69b2db8a44e8f9774fc78081b6df7 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sun, 25 Aug 2024 01:08:55 +0000 Subject: [PATCH 07/21] updated approach --- sky/backends/cloud_vm_ray_backend.py | 26 +++--- sky/provision/kubernetes/instance.py | 70 ++++---------- sky/provision/kubernetes/utils.py | 133 +++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 68 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 19315a38b48..afd5e391d7e 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -48,6 +48,7 @@ from sky.provision import instance_setup from sky.provision import metadata_utils from sky.provision import provisioner +from sky.provision.kubernetes import utils as kubernetes_utils from sky.skylet import autostop_lib from sky.skylet import constants from sky.skylet import job_lib @@ -1300,8 +1301,6 @@ def _retry_zones( prev_cluster_status: Optional[status_lib.ClusterStatus], prev_handle: Optional['CloudVmRayResourceHandle'], prev_cluster_ever_up: bool, - down: bool = False, - idle_minutes_to_autostop: Optional[int] = None, ) -> Dict[str, Any]: """The provision retry loop.""" style = colorama.Style @@ -1397,9 +1396,7 @@ def _retry_zones( region=region, zones=zones, dryrun=dryrun, - keep_launch_fields_in_existing_config=cluster_exists, - down=down, - idle_minutes_to_autostop=idle_minutes_to_autostop) + keep_launch_fields_in_existing_config=cluster_exists) except exceptions.ResourcesUnavailableError as e: # Failed due to catalog issue, e.g. image not found, or # GPUs are requested in a Kubernetes cluster but the cluster @@ -1918,8 +1915,6 @@ def provision_with_retries( to_provision_config: ToProvisionConfig, dryrun: bool, stream_logs: bool, - down: bool = False, - idle_minutes_to_autostop: Optional[int] = None, ) -> Dict[str, Any]: """Provision with retries for all launchable resources.""" cluster_name = to_provision_config.cluster_name @@ -1978,9 +1973,7 @@ def provision_with_retries( cloud_user_identity=cloud_user, prev_cluster_status=prev_cluster_status, prev_handle=prev_handle, - prev_cluster_ever_up=prev_cluster_ever_up, - down=down, - idle_minutes_to_autostop=idle_minutes_to_autostop) + prev_cluster_ever_up=prev_cluster_ever_up) if dryrun: return config_dict except (exceptions.InvalidClusterNameError, @@ -2693,9 +2686,7 @@ def _provision( dryrun: bool, stream_logs: bool, cluster_name: str, - retry_until_up: bool = False, - down: bool = False, - idle_minutes_to_autostop: Optional[int] = None) -> Optional[CloudVmRayResourceHandle]: + retry_until_up: bool = False) -> Optional[CloudVmRayResourceHandle]: """Provisions using 'ray up'. Raises: @@ -2778,7 +2769,7 @@ def _provision( wheel_hash, blocked_resources=task.blocked_resources) config_dict = retry_provisioner.provision_with_retries( - task, to_provision_config, dryrun, stream_logs, down, idle_minutes_to_autostop) + task, to_provision_config, dryrun, stream_logs) break except exceptions.ResourcesUnavailableError as e: # Do not remove the stopped cluster from the global state @@ -4137,6 +4128,13 @@ def set_autostop(self, global_user_state.set_cluster_autostop_value( handle.cluster_name, idle_minutes_to_autostop, down) + # Add/Remove autodown annotations from Kubernetes pods. + if isinstance(handle.launched_resources.cloud, clouds.Kubernetes): + kubernetes_utils.set_autodown_annotations( + handle=handle, + idle_minutes_to_autostop=idle_minutes_to_autostop, + down=down) + def is_definitely_autostopping(self, handle: CloudVmRayResourceHandle, stream_logs: bool = True) -> bool: diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index a233b7a944f..4f0deeabc05 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -28,48 +28,6 @@ TAG_SKYPILOT_CLUSTER_NAME = 'skypilot-cluster-name' TAG_POD_INITIALIZED = 'skypilot-initialized' -POD_STATUSES = { - 'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating' -} - - -def to_label_selector(tags): - label_selector = '' - for k, v in tags.items(): - if label_selector != '': - label_selector += ',' - label_selector += '{}={}'.format(k, v) - return label_selector - - -def _get_namespace(provider_config: Dict[str, Any]) -> str: - return provider_config.get( - 'namespace', - kubernetes_utils.get_current_kube_config_context_namespace()) - - -def _filter_pods(namespace: str, tag_filters: Dict[str, str], - status_filters: Optional[List[str]]) -> Dict[str, Any]: - """Filters pods by tags and status.""" - non_included_pod_statuses = POD_STATUSES.copy() - - field_selector = '' - if status_filters is not None: - non_included_pod_statuses -= set(status_filters) - field_selector = ','.join( - [f'status.phase!={status}' for status in non_included_pod_statuses]) - - label_selector = to_label_selector(tag_filters) - pod_list = kubernetes.core_api().list_namespaced_pod( - namespace, field_selector=field_selector, label_selector=label_selector) - - # Don't return pods marked for deletion, - # i.e. pods with non-null metadata.DeletionTimestamp. - pods = [ - pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None - ] - return {pod.metadata.name: pod for pod in pods} - def _get_head_pod_name(pods: Dict[str, Any]) -> Optional[str]: head_pod_name = None @@ -464,7 +422,7 @@ def _create_pods(region: str, cluster_name_on_cloud: str, config: common.ProvisionConfig) -> common.ProvisionRecord: """Create pods based on the config.""" provider_config = config.provider_config - namespace = _get_namespace(provider_config) + namespace = kubernetes_utils.get_namespace(provider_config) pod_spec = copy.deepcopy(config.node_config) tags = { TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud, @@ -477,7 +435,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str, pod_spec['metadata']['labels'].update( {TAG_SKYPILOT_CLUSTER_NAME: cluster_name_on_cloud}) - terminating_pods = _filter_pods(namespace, tags, ['Terminating']) + terminating_pods = kubernetes_utils.filter_pods(namespace, tags, + ['Terminating']) start_time = time.time() while (len(terminating_pods) > 0 and time.time() - start_time < _TIMEOUT_FOR_POD_TERMINATION): @@ -485,7 +444,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str, 'terminating pods. Waiting them to finish: ' f'{list(terminating_pods.keys())}') time.sleep(POLL_INTERVAL) - terminating_pods = _filter_pods(namespace, tags, ['Terminating']) + terminating_pods = kubernetes_utils.filter_pods(namespace, tags, + ['Terminating']) if len(terminating_pods) > 0: # If there are still terminating pods, we force delete them. @@ -502,7 +462,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str, _request_timeout=config_lib.DELETION_TIMEOUT, grace_period_seconds=0) - running_pods = _filter_pods(namespace, tags, ['Pending', 'Running']) + running_pods = kubernetes_utils.filter_pods(namespace, tags, + ['Pending', 'Running']) head_pod_name = _get_head_pod_name(running_pods) logger.debug(f'Found {len(running_pods)} existing pods: ' f'{list(running_pods.keys())}') @@ -579,7 +540,7 @@ def _create_pods(region: str, cluster_name_on_cloud: str, if head_pod_name is None: head_pod_name = pod.metadata.name - wait_pods_dict = _filter_pods(namespace, tags, ['Pending']) + wait_pods_dict = kubernetes_utils.filter_pods(namespace, tags, ['Pending']) wait_pods = list(wait_pods_dict.values()) networking_mode = network_utils.get_networking_mode( @@ -609,8 +570,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str, logger.debug(f'run_instances: all pods are scheduled and running: ' f'{list(wait_pods_dict.keys())}') - running_pods = _filter_pods(namespace, tags, ['Running']) - initialized_pods = _filter_pods(namespace, { + running_pods = kubernetes_utils.filter_pods(namespace, tags, ['Running']) + initialized_pods = kubernetes_utils.filter_pods(namespace, { TAG_POD_INITIALIZED: 'true', **tags }, ['Running']) @@ -712,11 +673,11 @@ def terminate_instances( worker_only: bool = False, ) -> None: """See sky/provision/__init__.py""" - namespace = _get_namespace(provider_config) + namespace = kubernetes_utils.get_namespace(provider_config) tag_filters = { TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud, } - pods = _filter_pods(namespace, tag_filters, None) + pods = kubernetes_utils.filter_pods(namespace, tag_filters, None) def _is_head(pod) -> bool: return pod.metadata.labels[constants.TAG_RAY_NODE_KIND] == 'head' @@ -734,12 +695,13 @@ def get_cluster_info( provider_config: Optional[Dict[str, Any]] = None) -> common.ClusterInfo: del region # unused assert provider_config is not None - namespace = _get_namespace(provider_config) + namespace = kubernetes_utils.get_namespace(provider_config) tag_filters = { TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud, } - running_pods = _filter_pods(namespace, tag_filters, ['Running']) + running_pods = kubernetes_utils.filter_pods(namespace, tag_filters, + ['Running']) pods: Dict[str, List[common.InstanceInfo]] = {} head_pod_name = None @@ -858,7 +820,7 @@ def get_command_runners( """Get a command runner for the given cluster.""" assert cluster_info.provider_config is not None, cluster_info instances = cluster_info.instances - namespace = _get_namespace(cluster_info.provider_config) + namespace = kubernetes_utils.get_namespace(cluster_info.provider_config) node_list = [] if cluster_info.head_instance_id is not None: node_list = [(namespace, cluster_info.head_instance_id)] diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 7ad3d72e46b..938c66a68ad 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -6,10 +6,12 @@ import re import shutil import subprocess +import typing from typing import Any, Dict, List, Optional, Set, Tuple, Union from urllib.parse import urlparse import jinja2 +from kubernetes.client import V1Pod import yaml import sky @@ -17,6 +19,7 @@ from sky import sky_logging from sky import skypilot_config from sky.adaptors import kubernetes +from sky.provision import constants as provision_constants from sky.provision.kubernetes import network_utils from sky.skylet import constants from sky.utils import common_utils @@ -25,6 +28,9 @@ from sky.utils import schemas from sky.utils import ux_utils +if typing.TYPE_CHECKING: + from sky import backends + # TODO(romilb): Move constants to constants.py DEFAULT_NAMESPACE = 'default' @@ -60,6 +66,16 @@ PORT_FORWARD_PROXY_CMD_TEMPLATE = 'kubernetes-port-forward-proxy-command.sh' PORT_FORWARD_PROXY_CMD_PATH = '~/.sky/kubernetes-port-forward-proxy-command.sh' +POD_STATUSES = { + 'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating' +} +AUTODOWN_ANNOTATIONS_KEY = 'skypilot.co/autodown' +IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY = ( + 'skypilot.co/idle_minutes_to_autostop') +KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS = [ + AUTODOWN_ANNOTATIONS_KEY, IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY +] + logger = sky_logging.init_logger(__name__) @@ -1722,3 +1738,120 @@ def get_kubernetes_node_info() -> Dict[str, KubernetesNodeInfo]: free={'nvidia.com/gpu': int(accelerators_available)}) return node_info_dict + + +def to_label_selector(tags): + label_selector = '' + for k, v in tags.items(): + if label_selector != '': + label_selector += ',' + label_selector += '{}={}'.format(k, v) + return label_selector + + +def get_namespace(provider_config: Dict[str, Any]) -> str: + return provider_config.get('namespace', + get_current_kube_config_context_namespace()) + + +def filter_pods(namespace: str, + tag_filters: Dict[str, str], + status_filters: Optional[List[str]] = None) -> Dict[str, Any]: + """Filters pods by tags and status.""" + non_included_pod_statuses = POD_STATUSES.copy() + + field_selector = '' + if status_filters is not None: + non_included_pod_statuses -= set(status_filters) + field_selector = ','.join( + [f'status.phase!={status}' for status in non_included_pod_statuses]) + + label_selector = to_label_selector(tag_filters) + pod_list = kubernetes.core_api().list_namespaced_pod( + namespace, field_selector=field_selector, label_selector=label_selector) + + # Don't return pods marked for deletion, + # i.e. pods with non-null metadata.DeletionTimestamp. + pods = [ + pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None + ] + return {pod.metadata.name: pod for pod in pods} + + +def _remove_pod_annotations(pod: V1Pod, annotation_key: str, + namespace: str) -> None: + """Removes specified Annotation from a Kubernetes pod.""" + try: + # Remove the specified annotations + if pod.metadata.annotations: + if annotation_key in pod.metadata.annotations: + # Patch the pod with the updated metadata + body = {'metadata': {'annotations': {annotation_key: None}}} + kubernetes.core_api().patch_namespaced_pod( + name=pod.metadata.name, + namespace=namespace, + body=body, + _request_timeout=kubernetes.API_TIMEOUT) + + except kubernetes.api_exception() as e: + if e.status == 404: + logger.warning(f'Pod {pod.metadata.name} not found in namespace ' + f'{namespace} while trying to remove Annotations ' + f'{annotation_key}.') + else: + raise + + +def _add_pod_annotations(pod: V1Pod, annotation: Dict[str, str], + namespace: str) -> None: + """Adds Annotations on a specified Kubernetes pod.""" + try: + # Patch the pod with the updated metadata + body = {'metadata': {'annotations': annotation}} + kubernetes.core_api().patch_namespaced_pod( + name=pod.metadata.name, + namespace=namespace, + body=body, + _request_timeout=kubernetes.API_TIMEOUT) + + except kubernetes.api_exception() as e: + if e.status == 404: + logger.warning(f'Pod {pod.metadata.name} not found in namespace ' + f'{namespace} while trying to remove Annotations ' + f'{annotation}.') + else: + raise + + +def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', + idle_minutes_to_autostop: Optional[int], + down: bool = False) -> None: + """Adds or removes Annotations of autodown on Kubernetes pods. """ + annotation_keys = KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS + tags = { + provision_constants.TAG_RAY_CLUSTER_NAME: handle.cluster_name_on_cloud, + } + ray_config = common_utils.read_yaml(handle.cluster_yaml) + provider_config = ray_config['provider'] + namespace = get_namespace(provider_config) + # Pods are already in 'Running' status + running_pods = filter_pods(namespace, tags) + for _, pod in running_pods.items(): + if down: + for annotation_key in annotation_keys: + annotation: Dict[str, str] = {} + if annotation_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY: + annotation.update( + {annotation_key: str(idle_minutes_to_autostop)}) + else: # annotation_key == AUTODOWN_ANNOTATIONS_KEY + annotation.update({annotation_key: str(True)}) + _add_pod_annotations(pod=pod, + annotation=annotation, + namespace=namespace) + + elif (idle_minutes_to_autostop is not None and + idle_minutes_to_autostop < 0): + for annotation_key in annotation_keys: + _remove_pod_annotations(pod=pod, + annotation_key=annotation_key, + namespace=namespace) From d0bebd6e3efd262d2075a95eee5b8d30ef9df98c Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sun, 25 Aug 2024 01:16:20 +0000 Subject: [PATCH 08/21] nit --- sky/provision/kubernetes/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 938c66a68ad..070b1e64985 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1804,7 +1804,7 @@ def _remove_pod_annotations(pod: V1Pod, annotation_key: str, def _add_pod_annotations(pod: V1Pod, annotation: Dict[str, str], namespace: str) -> None: - """Adds Annotations on a specified Kubernetes pod.""" + """Adds specified Annotations on a Kubernetes pod.""" try: # Patch the pod with the updated metadata body = {'metadata': {'annotations': annotation}} From c75bca12fd8981a03bb36c7b2db889d87291cd48 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sun, 25 Aug 2024 01:41:07 +0000 Subject: [PATCH 09/21] nit --- sky/backends/cloud_vm_ray_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index afd5e391d7e..c4dd11a9180 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -4128,7 +4128,7 @@ def set_autostop(self, global_user_state.set_cluster_autostop_value( handle.cluster_name, idle_minutes_to_autostop, down) - # Add/Remove autodown annotations from Kubernetes pods. + # Add/Remove autodown annotations to/from Kubernetes pods. if isinstance(handle.launched_resources.cloud, clouds.Kubernetes): kubernetes_utils.set_autodown_annotations( handle=handle, From 68af033ed6359598f5c6eaababd62095befda7a4 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sun, 25 Aug 2024 02:07:37 +0000 Subject: [PATCH 10/21] Use constant to represent idle_minutes_to_autostop for cancellation --- sky/cli.py | 5 +++-- sky/core.py | 2 +- sky/provision/kubernetes/utils.py | 4 ++-- sky/skylet/constants.py | 4 +++- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index e50aca011a6..b6f1cfcd299 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -2257,7 +2257,7 @@ def autostop( 'Only one of --idle-minutes and --cancel should be specified. ' f'cancel: {cancel}, idle_minutes: {idle_minutes}') if cancel: - idle_minutes = -1 + idle_minutes = constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL elif idle_minutes is None: idle_minutes = 5 _down_or_stop_clusters(clusters, @@ -2704,7 +2704,8 @@ def _down_or_stop_clusters( operation = 'Terminating' if down else 'Stopping' if idle_minutes_to_autostop is not None: - is_cancel = idle_minutes_to_autostop < 0 + is_cancel = (idle_minutes_to_autostop == + constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL) verb = 'Cancelling' if is_cancel else 'Scheduling' option_str = 'down' if down else 'stop' if is_cancel: diff --git a/sky/core.py b/sky/core.py index 85f81ac6c7a..6563f8d8192 100644 --- a/sky/core.py +++ b/sky/core.py @@ -453,7 +453,7 @@ def autostop( sky.exceptions.CloudUserIdentityError: if we fail to get the current user identity. """ - is_cancel = idle_minutes < 0 + is_cancel = idle_minutes == constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL verb = 'Cancelling' if is_cancel else 'Scheduling' option_str = 'down' if down else 'stop' if is_cancel: diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 070b1e64985..d2a5029e63b 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1849,8 +1849,8 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', annotation=annotation, namespace=namespace) - elif (idle_minutes_to_autostop is not None and - idle_minutes_to_autostop < 0): + elif (idle_minutes_to_autostop is not None and idle_minutes_to_autostop + == constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL): for annotation_key in annotation_keys: _remove_pod_annotations(pod=pod, annotation_key=annotation_key, diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index 30820a3a91e..5c1a9208435 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -248,7 +248,9 @@ # controller and sky serve controller. # TODO(tian): Refactor to controller_utils. Current blocker: circular import. CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP = 10 - +# idle_minutes_to_autostop value used to indicate cancellation on registered +# autostop/down. +IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL = -1 # Due to the CPU/memory usage of the controller process launched with sky jobs ( # use ray job under the hood), we need to reserve some CPU/memory for each jobs/ # serve controller process. From 7a26edc64b5ff6493f6c365e0584bcb842898870 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Tue, 27 Aug 2024 02:43:43 +0000 Subject: [PATCH 11/21] revert using constants for cancel --- sky/cli.py | 5 ++--- sky/core.py | 2 +- sky/provision/kubernetes/utils.py | 4 ++-- sky/skylet/constants.py | 4 +--- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index b6f1cfcd299..e50aca011a6 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -2257,7 +2257,7 @@ def autostop( 'Only one of --idle-minutes and --cancel should be specified. ' f'cancel: {cancel}, idle_minutes: {idle_minutes}') if cancel: - idle_minutes = constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL + idle_minutes = -1 elif idle_minutes is None: idle_minutes = 5 _down_or_stop_clusters(clusters, @@ -2704,8 +2704,7 @@ def _down_or_stop_clusters( operation = 'Terminating' if down else 'Stopping' if idle_minutes_to_autostop is not None: - is_cancel = (idle_minutes_to_autostop == - constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL) + is_cancel = idle_minutes_to_autostop < 0 verb = 'Cancelling' if is_cancel else 'Scheduling' option_str = 'down' if down else 'stop' if is_cancel: diff --git a/sky/core.py b/sky/core.py index 6563f8d8192..85f81ac6c7a 100644 --- a/sky/core.py +++ b/sky/core.py @@ -453,7 +453,7 @@ def autostop( sky.exceptions.CloudUserIdentityError: if we fail to get the current user identity. """ - is_cancel = idle_minutes == constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL + is_cancel = idle_minutes < 0 verb = 'Cancelling' if is_cancel else 'Scheduling' option_str = 'down' if down else 'stop' if is_cancel: diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index d2a5029e63b..070b1e64985 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1849,8 +1849,8 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', annotation=annotation, namespace=namespace) - elif (idle_minutes_to_autostop is not None and idle_minutes_to_autostop - == constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL): + elif (idle_minutes_to_autostop is not None and + idle_minutes_to_autostop < 0): for annotation_key in annotation_keys: _remove_pod_annotations(pod=pod, annotation_key=annotation_key, diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index 5c1a9208435..30820a3a91e 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -248,9 +248,7 @@ # controller and sky serve controller. # TODO(tian): Refactor to controller_utils. Current blocker: circular import. CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP = 10 -# idle_minutes_to_autostop value used to indicate cancellation on registered -# autostop/down. -IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL = -1 + # Due to the CPU/memory usage of the controller process launched with sky jobs ( # use ray job under the hood), we need to reserve some CPU/memory for each jobs/ # serve controller process. From 42b8dda81e25d81e4f7f56bd55ec91ae8041adb3 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Tue, 27 Aug 2024 04:01:03 +0000 Subject: [PATCH 12/21] nit --- sky/provision/kubernetes/utils.py | 66 +++++++++++++++++++------------ 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 070b1e64985..c7863cb99a2 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -75,6 +75,9 @@ KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS = [ AUTODOWN_ANNOTATIONS_KEY, IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY ] +ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG = ('Pod {pod_name} not found in namespace ' + '{namespace} while trying to {action} ' + 'Annotations {annotations}.') logger = sky_logging.init_logger(__name__) @@ -1778,15 +1781,15 @@ def filter_pods(namespace: str, return {pod.metadata.name: pod for pod in pods} -def _remove_pod_annotations(pod: V1Pod, annotation_key: str, +def _remove_pod_annotations(pod: V1Pod, annotations_key: str, namespace: str) -> None: - """Removes specified Annotation from a Kubernetes pod.""" + """Removes specified Annotations from a Kubernetes pod.""" try: # Remove the specified annotations if pod.metadata.annotations: - if annotation_key in pod.metadata.annotations: + if annotations_key in pod.metadata.annotations: # Patch the pod with the updated metadata - body = {'metadata': {'annotations': {annotation_key: None}}} + body = {'metadata': {'annotations': {annotations_key: None}}} kubernetes.core_api().patch_namespaced_pod( name=pod.metadata.name, namespace=namespace, @@ -1795,19 +1798,23 @@ def _remove_pod_annotations(pod: V1Pod, annotation_key: str, except kubernetes.api_exception() as e: if e.status == 404: - logger.warning(f'Pod {pod.metadata.name} not found in namespace ' - f'{namespace} while trying to remove Annotations ' - f'{annotation_key}.') + logger.warning( + ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG.format( + pod_name=pod.metadata.name, + namespace=namespace, + action='remove', + annotations=annotations_key)) else: - raise + with ux_utils.print_exception_no_traceback(): + raise -def _add_pod_annotations(pod: V1Pod, annotation: Dict[str, str], +def _add_pod_annotations(pod: V1Pod, annotations: Dict[str, str], namespace: str) -> None: """Adds specified Annotations on a Kubernetes pod.""" try: # Patch the pod with the updated metadata - body = {'metadata': {'annotations': annotation}} + body = {'metadata': {'annotations': annotations}} kubernetes.core_api().patch_namespaced_pod( name=pod.metadata.name, namespace=namespace, @@ -1816,42 +1823,49 @@ def _add_pod_annotations(pod: V1Pod, annotation: Dict[str, str], except kubernetes.api_exception() as e: if e.status == 404: - logger.warning(f'Pod {pod.metadata.name} not found in namespace ' - f'{namespace} while trying to remove Annotations ' - f'{annotation}.') + logger.warning( + ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG.format( + pod_name=pod.metadata.name, + namespace=namespace, + action='add', + annotations=annotations)) else: - raise + with ux_utils.print_exception_no_traceback(): + raise def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', idle_minutes_to_autostop: Optional[int], down: bool = False) -> None: - """Adds or removes Annotations of autodown on Kubernetes pods. """ - annotation_keys = KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS + """Adds or removes Annotations of autodown on Kubernetes pods.""" + annotations_keys = KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS tags = { provision_constants.TAG_RAY_CLUSTER_NAME: handle.cluster_name_on_cloud, } ray_config = common_utils.read_yaml(handle.cluster_yaml) provider_config = ray_config['provider'] namespace = get_namespace(provider_config) - # Pods are already in 'Running' status running_pods = filter_pods(namespace, tags) + for _, pod in running_pods.items(): if down: - for annotation_key in annotation_keys: - annotation: Dict[str, str] = {} - if annotation_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY: - annotation.update( - {annotation_key: str(idle_minutes_to_autostop)}) + for annotations_key in annotations_keys: + annotations: Dict[str, str] = {} + if annotations_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY: + annotations.update( + {annotations_key: str(idle_minutes_to_autostop)}) else: # annotation_key == AUTODOWN_ANNOTATIONS_KEY - annotation.update({annotation_key: str(True)}) + annotations.update({annotations_key: str(True)}) _add_pod_annotations(pod=pod, - annotation=annotation, + annotations=annotations, namespace=namespace) + # If idle_minutes_to_autostop is negative, it indicates a request to + # cancel autostop using the --cancel flag with the `sky autostop` + # command. elif (idle_minutes_to_autostop is not None and idle_minutes_to_autostop < 0): - for annotation_key in annotation_keys: + for annotations_key in annotations_keys: _remove_pod_annotations(pod=pod, - annotation_key=annotation_key, + annotations_key=annotations_key, namespace=namespace) From 6f71e379f9834c4aa7a09997a1a9016182f42f8f Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Tue, 27 Aug 2024 04:57:04 +0000 Subject: [PATCH 13/21] nit --- sky/provision/kubernetes/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index c7863cb99a2..64f2fd7e29a 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -11,7 +11,6 @@ from urllib.parse import urlparse import jinja2 -from kubernetes.client import V1Pod import yaml import sky @@ -79,6 +78,8 @@ '{namespace} while trying to {action} ' 'Annotations {annotations}.') +V1Pod = Any + logger = sky_logging.init_logger(__name__) From 73139a0e94465228275cfc41d57244289d44e0ab Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Wed, 28 Aug 2024 03:45:27 +0000 Subject: [PATCH 14/21] add smoke tests --- tests/test_smoke.py | 58 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 28eeeed5190..b044d13b0fa 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -2061,6 +2061,64 @@ def test_task_labels_kubernetes(): run_one_test(test) +# ---------- Pod Annotations on Kubernetes ---------- +@pytest.mark.kubernetes +def test_add_pod_annotations_for_autodown_with_launch(): + name = _get_cluster_name() + test = Test( + 'add_pod_annotations_for_autodown_with_launch', + [ + # Launch Kubernetes cluster with two nodes, each being head node and worker node. + # Autodown is set. + f'sky launch -y -c {name} -i 10 --down --num-nodes 2 --cpus=1 --cloud kubernetes', + # Get names of the pods containing cluster name. + f'pod_1=$(kubectl get pods -o name | grep {name} | sed -n 1p)', + f'pod_2=$(kubectl get pods -o name | grep {name} | sed -n 2p)', + # Describe the first pod and check for annotations. + 'kubectl describe pod $pod_1 | grep -q skypilot.co/autodown', + 'kubectl describe pod $pod_1 | grep -q skypilot.co/idle_minutes_to_autostop', + # Describe the second pod and check for annotations. + 'kubectl describe pod $pod_2 | grep -q skypilot.co/autodown', + 'kubectl describe pod $pod_2 | grep -q skypilot.co/idle_minutes_to_autostop' + ], + f'sky down -y {name}', + ) + run_one_test(test) + + +@pytest.mark.kubernetes +def test_add_and_remove_pod_annotations_with_autostop(): + name = _get_cluster_name() + test = Test( + 'add_and_remove_pod_annotations_with_autostop', + [ + # Launch Kubernetes cluster with two nodes, each being head node and worker node. + f'sky launch -y -c {name} --num-nodes 2 --cpus=1 --cloud kubernetes', + # Set autodown on the cluster with 'autostop' command. + f'sky autostop -y {name} -i 20 --down', + # Get names of the pods containing cluster name. + f'pod_1=$(kubectl get pods -o name | grep {name} | sed -n 1p)', + f'pod_2=$(kubectl get pods -o name | grep {name} | sed -n 2p)', + # Describe the first pod and check for annotations. + 'kubectl describe pod $pod_1 | grep -q skypilot.co/autodown', + 'kubectl describe pod $pod_1 | grep -q skypilot.co/idle_minutes_to_autostop', + # Describe the second pod and check for annotations. + 'kubectl describe pod $pod_2 | grep -q skypilot.co/autodown', + 'kubectl describe pod $pod_2 | grep -q skypilot.co/idle_minutes_to_autostop', + # Cancel the set autodown to remove the annotations from the pods. + f'sky autostop -y {name} --cancel', + # Describe the first pod and check if annotations are removed. + '! kubectl describe pod $pod_1 | grep -q skypilot.co/autodown', + '! kubectl describe pod $pod_1 | grep -q skypilot.co/idle_minutes_to_autostop', + # Describe the second pod and check if annotations are removed. + '! kubectl describe pod $pod_2 | grep -q skypilot.co/autodown', + '! kubectl describe pod $pod_2 | grep -q skypilot.co/idle_minutes_to_autostop', + ], + f'sky down -y {name}', + ) + run_one_test(test) + + # ---------- Task: n=2 nodes with setups. ---------- @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA From c2927ca6d662112b50dcd5d6f9696a7d8b320eed Mon Sep 17 00:00:00 2001 From: landscapepainter <34902420+landscapepainter@users.noreply.github.com> Date: Tue, 3 Sep 2024 18:41:15 -0700 Subject: [PATCH 15/21] Update sky/provision/kubernetes/utils.py Co-authored-by: Romil Bhardwaj --- sky/provision/kubernetes/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 64f2fd7e29a..4c7dc5e0d6e 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1856,7 +1856,7 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', annotations.update( {annotations_key: str(idle_minutes_to_autostop)}) else: # annotation_key == AUTODOWN_ANNOTATIONS_KEY - annotations.update({annotations_key: str(True)}) + annotations.update({annotations_key: 'true')}) _add_pod_annotations(pod=pod, annotations=annotations, namespace=namespace) From f0910078b25494b1ac32934ce29d236204ee6835 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Wed, 4 Sep 2024 03:05:53 +0000 Subject: [PATCH 16/21] fix comments --- sky/provision/kubernetes/utils.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 4c7dc5e0d6e..38e8034487d 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -68,17 +68,16 @@ POD_STATUSES = { 'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating' } -AUTODOWN_ANNOTATIONS_KEY = 'skypilot.co/autodown' -IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY = ( +AUTODOWN_ANNOTATION_KEY = 'skypilot.co/autodown' +IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY = ( 'skypilot.co/idle_minutes_to_autostop') KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS = [ - AUTODOWN_ANNOTATIONS_KEY, IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY + AUTODOWN_ANNOTATION_KEY, IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY ] ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG = ('Pod {pod_name} not found in namespace ' '{namespace} while trying to {action} ' 'Annotations {annotations}.') -V1Pod = Any logger = sky_logging.init_logger(__name__) @@ -1782,14 +1781,14 @@ def filter_pods(namespace: str, return {pod.metadata.name: pod for pod in pods} -def _remove_pod_annotations(pod: V1Pod, annotations_key: str, +def _remove_pod_annotations(pod: Any, annotations_key: str, namespace: str) -> None: """Removes specified Annotations from a Kubernetes pod.""" try: # Remove the specified annotations if pod.metadata.annotations: if annotations_key in pod.metadata.annotations: - # Patch the pod with the updated metadata + # Patch the pod with the updated metadata. body = {'metadata': {'annotations': {annotations_key: None}}} kubernetes.core_api().patch_namespaced_pod( name=pod.metadata.name, @@ -1810,7 +1809,7 @@ def _remove_pod_annotations(pod: V1Pod, annotations_key: str, raise -def _add_pod_annotations(pod: V1Pod, annotations: Dict[str, str], +def _add_pod_annotations(pod: Any, annotations: Dict[str, str], namespace: str) -> None: """Adds specified Annotations on a Kubernetes pod.""" try: @@ -1852,11 +1851,11 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', if down: for annotations_key in annotations_keys: annotations: Dict[str, str] = {} - if annotations_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY: + if annotations_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY: annotations.update( {annotations_key: str(idle_minutes_to_autostop)}) - else: # annotation_key == AUTODOWN_ANNOTATIONS_KEY - annotations.update({annotations_key: 'true')}) + else: # annotation_key == AUTODOWN_ANNOTATION_KEY + annotations.update({annotations_key: 'true'}) _add_pod_annotations(pod=pod, annotations=annotations, namespace=namespace) From aaa28b79bf5cd491bb64e315d3efa7979f31c9fe Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Wed, 4 Sep 2024 03:14:07 +0000 Subject: [PATCH 17/21] nit --- sky/provision/kubernetes/utils.py | 49 +++++++++++++++---------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 38e8034487d..1fcc55576e8 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -76,8 +76,7 @@ ] ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG = ('Pod {pod_name} not found in namespace ' '{namespace} while trying to {action} ' - 'Annotations {annotations}.') - + 'an annotation {annotation}.') logger = sky_logging.init_logger(__name__) @@ -1781,15 +1780,15 @@ def filter_pods(namespace: str, return {pod.metadata.name: pod for pod in pods} -def _remove_pod_annotations(pod: Any, annotations_key: str, - namespace: str) -> None: +def _remove_pod_annotation(pod: Any, annotation_key: str, + namespace: str) -> None: """Removes specified Annotations from a Kubernetes pod.""" try: - # Remove the specified annotations + # Remove the specified annotation if pod.metadata.annotations: - if annotations_key in pod.metadata.annotations: + if annotation_key in pod.metadata.annotations: # Patch the pod with the updated metadata. - body = {'metadata': {'annotations': {annotations_key: None}}} + body = {'metadata': {'annotations': {annotation_key: None}}} kubernetes.core_api().patch_namespaced_pod( name=pod.metadata.name, namespace=namespace, @@ -1803,18 +1802,18 @@ def _remove_pod_annotations(pod: Any, annotations_key: str, pod_name=pod.metadata.name, namespace=namespace, action='remove', - annotations=annotations_key)) + annotation=annotation_key)) else: with ux_utils.print_exception_no_traceback(): raise -def _add_pod_annotations(pod: Any, annotations: Dict[str, str], - namespace: str) -> None: +def _add_pod_annotation(pod: Any, annotation: Dict[str, str], + namespace: str) -> None: """Adds specified Annotations on a Kubernetes pod.""" try: # Patch the pod with the updated metadata - body = {'metadata': {'annotations': annotations}} + body = {'metadata': {'annotations': annotation}} kubernetes.core_api().patch_namespaced_pod( name=pod.metadata.name, namespace=namespace, @@ -1828,7 +1827,7 @@ def _add_pod_annotations(pod: Any, annotations: Dict[str, str], pod_name=pod.metadata.name, namespace=namespace, action='add', - annotations=annotations)) + annotation=annotation)) else: with ux_utils.print_exception_no_traceback(): raise @@ -1849,23 +1848,23 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', for _, pod in running_pods.items(): if down: - for annotations_key in annotations_keys: - annotations: Dict[str, str] = {} - if annotations_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY: - annotations.update( - {annotations_key: str(idle_minutes_to_autostop)}) + for annotation_key in annotations_keys: + annotation: Dict[str, str] = {} + if annotation_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY: + annotation.update( + {annotation_key: str(idle_minutes_to_autostop)}) else: # annotation_key == AUTODOWN_ANNOTATION_KEY - annotations.update({annotations_key: 'true'}) - _add_pod_annotations(pod=pod, - annotations=annotations, - namespace=namespace) + annotation.update({annotation_key: 'true'}) + _add_pod_annotation(pod=pod, + annotation=annotation, + namespace=namespace) # If idle_minutes_to_autostop is negative, it indicates a request to # cancel autostop using the --cancel flag with the `sky autostop` # command. elif (idle_minutes_to_autostop is not None and idle_minutes_to_autostop < 0): - for annotations_key in annotations_keys: - _remove_pod_annotations(pod=pod, - annotations_key=annotations_key, - namespace=namespace) + for annotation_key in annotations_keys: + _remove_pod_annotation(pod=pod, + annotation_key=annotation_key, + namespace=namespace) From 6219702eeacdd216cde66ee73ce6da523c342fa0 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Wed, 11 Sep 2024 02:46:39 +0000 Subject: [PATCH 18/21] remove loops and annotate one by one --- sky/provision/kubernetes/utils.py | 32 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 1fcc55576e8..c8aff54c711 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -71,9 +71,6 @@ AUTODOWN_ANNOTATION_KEY = 'skypilot.co/autodown' IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY = ( 'skypilot.co/idle_minutes_to_autostop') -KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS = [ - AUTODOWN_ANNOTATION_KEY, IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY -] ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG = ('Pod {pod_name} not found in namespace ' '{namespace} while trying to {action} ' 'an annotation {annotation}.') @@ -1837,7 +1834,6 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', idle_minutes_to_autostop: Optional[int], down: bool = False) -> None: """Adds or removes Annotations of autodown on Kubernetes pods.""" - annotations_keys = KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS tags = { provision_constants.TAG_RAY_CLUSTER_NAME: handle.cluster_name_on_cloud, } @@ -1848,23 +1844,27 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', for _, pod in running_pods.items(): if down: - for annotation_key in annotations_keys: - annotation: Dict[str, str] = {} - if annotation_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY: - annotation.update( - {annotation_key: str(idle_minutes_to_autostop)}) - else: # annotation_key == AUTODOWN_ANNOTATION_KEY - annotation.update({annotation_key: 'true'}) - _add_pod_annotation(pod=pod, - annotation=annotation, - namespace=namespace) + idle_minutes_to_autostop_annotation = { + IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY: str(idle_minutes_to_autostop) + } + autodown_annotation = { + AUTODOWN_ANNOTATION_KEY: 'true' + } + _add_pod_annotation(pod=pod, + annotation=idle_minutes_to_autostop_annotation, + namespace=namespace) + _add_pod_annotation(pod=pod, + annotation=autodown_annotation, + namespace=namespace) # If idle_minutes_to_autostop is negative, it indicates a request to # cancel autostop using the --cancel flag with the `sky autostop` # command. elif (idle_minutes_to_autostop is not None and idle_minutes_to_autostop < 0): - for annotation_key in annotations_keys: _remove_pod_annotation(pod=pod, - annotation_key=annotation_key, + annotation_key=IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY, + namespace=namespace) + _remove_pod_annotation(pod=pod, + annotation_key=AUTODOWN_ANNOTATION_KEY, namespace=namespace) From 07a5a54d9964f7211b24e5f3a50b86203c8b0dbb Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Wed, 11 Sep 2024 02:49:03 +0000 Subject: [PATCH 19/21] format --- sky/provision/kubernetes/utils.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index c8aff54c711..d51f6f31c1d 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1845,11 +1845,10 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', for _, pod in running_pods.items(): if down: idle_minutes_to_autostop_annotation = { - IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY: str(idle_minutes_to_autostop) - } - autodown_annotation = { - AUTODOWN_ANNOTATION_KEY: 'true' + IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY: + str(idle_minutes_to_autostop) } + autodown_annotation = {AUTODOWN_ANNOTATION_KEY: 'true'} _add_pod_annotation(pod=pod, annotation=idle_minutes_to_autostop_annotation, namespace=namespace) @@ -1862,9 +1861,10 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', # command. elif (idle_minutes_to_autostop is not None and idle_minutes_to_autostop < 0): - _remove_pod_annotation(pod=pod, - annotation_key=IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY, - namespace=namespace) - _remove_pod_annotation(pod=pod, - annotation_key=AUTODOWN_ANNOTATION_KEY, - namespace=namespace) + _remove_pod_annotation( + pod=pod, + annotation_key=IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY, + namespace=namespace) + _remove_pod_annotation(pod=pod, + annotation_key=AUTODOWN_ANNOTATION_KEY, + namespace=namespace) From 38a64a10a90725f02005b779888ac19e0591e03e Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sat, 14 Sep 2024 03:48:00 +0000 Subject: [PATCH 20/21] update with autodown annotation with context --- sky/provision/kubernetes/instance.py | 2 +- sky/provision/kubernetes/utils.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index a4630263eb4..18feaa18bd3 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -682,7 +682,7 @@ def terminate_instances( worker_only: bool = False, ) -> None: """See sky/provision/__init__.py""" - namespace = kubernetes_utils.get_namespace(provider_config) + namespace = kubernetes_utils.get_namespace_from_config(provider_config) context = kubernetes_utils.get_context_from_config(provider_config) tag_filters = { TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud, diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index f264defb6df..7616d247f9f 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1777,7 +1777,7 @@ def get_namespace_from_config(provider_config: Dict[str, Any]) -> str: get_current_kube_config_context_namespace()) -def filter_pods(namespace: str, +def filter_pods(namespace: str, context: str, tag_filters: Dict[str, str], status_filters: Optional[List[str]] = None) -> Dict[str, Any]: """Filters pods by tags and status.""" @@ -1790,7 +1790,7 @@ def filter_pods(namespace: str, [f'status.phase!={status}' for status in non_included_pod_statuses]) label_selector = to_label_selector(tag_filters) - pod_list = kubernetes.core_api().list_namespaced_pod( + pod_list = kubernetes.core_api(context).list_namespaced_pod( namespace, field_selector=field_selector, label_selector=label_selector) # Don't return pods marked for deletion, @@ -1864,7 +1864,8 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle', ray_config = common_utils.read_yaml(handle.cluster_yaml) provider_config = ray_config['provider'] namespace = get_namespace_from_config(provider_config) - running_pods = filter_pods(namespace, tags) + context = get_context_from_config(provider_config) + running_pods = filter_pods(namespace, context, tags) for _, pod in running_pods.items(): if down: From ef0b295f92f6875e171933bb3e18a92600665f11 Mon Sep 17 00:00:00 2001 From: Doyoung Kim Date: Sat, 14 Sep 2024 03:49:16 +0000 Subject: [PATCH 21/21] format --- sky/provision/kubernetes/instance.py | 11 ++++++----- sky/provision/kubernetes/utils.py | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index 18feaa18bd3..f9ee75e466b 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -448,8 +448,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str, 'terminating pods. Waiting them to finish: ' f'{list(terminating_pods.keys())}') time.sleep(POLL_INTERVAL) - terminating_pods = kubernetes_utils.filter_pods(namespace, context, tags, - ['Terminating']) + terminating_pods = kubernetes_utils.filter_pods(namespace, context, + tags, ['Terminating']) if len(terminating_pods) > 0: # If there are still terminating pods, we force delete them. @@ -548,7 +548,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str, if head_pod_name is None: head_pod_name = pod.metadata.name - wait_pods_dict = kubernetes_utils.filter_pods(namespace, context, tags, ['Pending']) + wait_pods_dict = kubernetes_utils.filter_pods(namespace, context, tags, + ['Pending']) wait_pods = list(wait_pods_dict.values()) networking_mode = network_utils.get_networking_mode( @@ -578,7 +579,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str, logger.debug(f'run_instances: all pods are scheduled and running: ' f'{list(wait_pods_dict.keys())}') - running_pods = kubernetes_utils.filter_pods(namespace, context, tags, ['Running']) + running_pods = kubernetes_utils.filter_pods(namespace, context, tags, + ['Running']) initialized_pods = kubernetes_utils.filter_pods(namespace, context, { TAG_POD_INITIALIZED: 'true', **tags @@ -689,7 +691,6 @@ def terminate_instances( } pods = kubernetes_utils.filter_pods(namespace, context, tag_filters, None) - def _is_head(pod) -> bool: return pod.metadata.labels[constants.TAG_RAY_NODE_KIND] == 'head' diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 7616d247f9f..a8abb24b917 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1777,7 +1777,8 @@ def get_namespace_from_config(provider_config: Dict[str, Any]) -> str: get_current_kube_config_context_namespace()) -def filter_pods(namespace: str, context: str, +def filter_pods(namespace: str, + context: str, tag_filters: Dict[str, str], status_filters: Optional[List[str]] = None) -> Dict[str, Any]: """Filters pods by tags and status."""