Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] Add cluster attributes(autodown, idle-minutes-to-autostop) as annotations to the pod #3870

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from sky.provision import instance_setup
from sky.provision import metadata_utils
from sky.provision import provisioner
from sky.provision.kubernetes import utils as kubernetes_utils
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we quickly check if this global import does not break skypilot if kubernetes dependencies are not installed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested by running sky launch --cloud gcp -y from a new env where only pip install -e ".[gcp]" is set, and there was no issue raised.

from sky.skylet import autostop_lib
from sky.skylet import constants
from sky.skylet import job_lib
Expand Down Expand Up @@ -4127,6 +4128,13 @@ def set_autostop(self,
global_user_state.set_cluster_autostop_value(
handle.cluster_name, idle_minutes_to_autostop, down)

# Add/Remove autodown annotations to/from Kubernetes pods.
if isinstance(handle.launched_resources.cloud, clouds.Kubernetes):
kubernetes_utils.set_autodown_annotations(
handle=handle,
idle_minutes_to_autostop=idle_minutes_to_autostop,
down=down)

def is_definitely_autostopping(self,
handle: CloudVmRayResourceHandle,
stream_logs: bool = True) -> bool:
Expand Down
5 changes: 3 additions & 2 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2257,7 +2257,7 @@ def autostop(
'Only one of --idle-minutes and --cancel should be specified. '
f'cancel: {cancel}, idle_minutes: {idle_minutes}')
if cancel:
idle_minutes = -1
idle_minutes = constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL
elif idle_minutes is None:
idle_minutes = 5
_down_or_stop_clusters(clusters,
Expand Down Expand Up @@ -2704,7 +2704,8 @@ def _down_or_stop_clusters(

operation = 'Terminating' if down else 'Stopping'
if idle_minutes_to_autostop is not None:
is_cancel = idle_minutes_to_autostop < 0
is_cancel = (idle_minutes_to_autostop ==
constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL)
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved
verb = 'Cancelling' if is_cancel else 'Scheduling'
option_str = 'down' if down else 'stop'
if is_cancel:
Expand Down
2 changes: 1 addition & 1 deletion sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def autostop(
sky.exceptions.CloudUserIdentityError: if we fail to get the current
user identity.
"""
is_cancel = idle_minutes < 0
is_cancel = idle_minutes == constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL
verb = 'Cancelling' if is_cancel else 'Scheduling'
option_str = 'down' if down else 'stop'
if is_cancel:
Expand Down
70 changes: 16 additions & 54 deletions sky/provision/kubernetes/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,6 @@
TAG_SKYPILOT_CLUSTER_NAME = 'skypilot-cluster-name'
TAG_POD_INITIALIZED = 'skypilot-initialized'

POD_STATUSES = {
'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating'
}


def to_label_selector(tags):
label_selector = ''
for k, v in tags.items():
if label_selector != '':
label_selector += ','
label_selector += '{}={}'.format(k, v)
return label_selector


def _get_namespace(provider_config: Dict[str, Any]) -> str:
return provider_config.get(
'namespace',
kubernetes_utils.get_current_kube_config_context_namespace())


def _filter_pods(namespace: str, tag_filters: Dict[str, str],
status_filters: Optional[List[str]]) -> Dict[str, Any]:
"""Filters pods by tags and status."""
non_included_pod_statuses = POD_STATUSES.copy()

field_selector = ''
if status_filters is not None:
non_included_pod_statuses -= set(status_filters)
field_selector = ','.join(
[f'status.phase!={status}' for status in non_included_pod_statuses])

label_selector = to_label_selector(tag_filters)
pod_list = kubernetes.core_api().list_namespaced_pod(
namespace, field_selector=field_selector, label_selector=label_selector)

# Don't return pods marked for deletion,
# i.e. pods with non-null metadata.DeletionTimestamp.
pods = [
pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None
]
return {pod.metadata.name: pod for pod in pods}


def _get_head_pod_name(pods: Dict[str, Any]) -> Optional[str]:
head_pod_name = None
Expand Down Expand Up @@ -464,7 +422,7 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
config: common.ProvisionConfig) -> common.ProvisionRecord:
"""Create pods based on the config."""
provider_config = config.provider_config
namespace = _get_namespace(provider_config)
namespace = kubernetes_utils.get_namespace(provider_config)
pod_spec = copy.deepcopy(config.node_config)
tags = {
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
Expand All @@ -477,15 +435,17 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
pod_spec['metadata']['labels'].update(
{TAG_SKYPILOT_CLUSTER_NAME: cluster_name_on_cloud})

terminating_pods = _filter_pods(namespace, tags, ['Terminating'])
terminating_pods = kubernetes_utils.filter_pods(namespace, tags,
['Terminating'])
start_time = time.time()
while (len(terminating_pods) > 0 and
time.time() - start_time < _TIMEOUT_FOR_POD_TERMINATION):
logger.debug(f'run_instances: Found {len(terminating_pods)} '
'terminating pods. Waiting them to finish: '
f'{list(terminating_pods.keys())}')
time.sleep(POLL_INTERVAL)
terminating_pods = _filter_pods(namespace, tags, ['Terminating'])
terminating_pods = kubernetes_utils.filter_pods(namespace, tags,
['Terminating'])

if len(terminating_pods) > 0:
# If there are still terminating pods, we force delete them.
Expand All @@ -502,7 +462,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
_request_timeout=config_lib.DELETION_TIMEOUT,
grace_period_seconds=0)

running_pods = _filter_pods(namespace, tags, ['Pending', 'Running'])
running_pods = kubernetes_utils.filter_pods(namespace, tags,
['Pending', 'Running'])
head_pod_name = _get_head_pod_name(running_pods)
logger.debug(f'Found {len(running_pods)} existing pods: '
f'{list(running_pods.keys())}')
Expand Down Expand Up @@ -579,7 +540,7 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
if head_pod_name is None:
head_pod_name = pod.metadata.name

wait_pods_dict = _filter_pods(namespace, tags, ['Pending'])
wait_pods_dict = kubernetes_utils.filter_pods(namespace, tags, ['Pending'])
wait_pods = list(wait_pods_dict.values())

networking_mode = network_utils.get_networking_mode(
Expand Down Expand Up @@ -609,8 +570,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
logger.debug(f'run_instances: all pods are scheduled and running: '
f'{list(wait_pods_dict.keys())}')

running_pods = _filter_pods(namespace, tags, ['Running'])
initialized_pods = _filter_pods(namespace, {
running_pods = kubernetes_utils.filter_pods(namespace, tags, ['Running'])
initialized_pods = kubernetes_utils.filter_pods(namespace, {
TAG_POD_INITIALIZED: 'true',
**tags
}, ['Running'])
Expand Down Expand Up @@ -712,11 +673,11 @@ def terminate_instances(
worker_only: bool = False,
) -> None:
"""See sky/provision/__init__.py"""
namespace = _get_namespace(provider_config)
namespace = kubernetes_utils.get_namespace(provider_config)
tag_filters = {
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
}
pods = _filter_pods(namespace, tag_filters, None)
pods = kubernetes_utils.filter_pods(namespace, tag_filters, None)

def _is_head(pod) -> bool:
return pod.metadata.labels[constants.TAG_RAY_NODE_KIND] == 'head'
Expand All @@ -734,12 +695,13 @@ def get_cluster_info(
provider_config: Optional[Dict[str, Any]] = None) -> common.ClusterInfo:
del region # unused
assert provider_config is not None
namespace = _get_namespace(provider_config)
namespace = kubernetes_utils.get_namespace(provider_config)
tag_filters = {
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
}

running_pods = _filter_pods(namespace, tag_filters, ['Running'])
running_pods = kubernetes_utils.filter_pods(namespace, tag_filters,
['Running'])
pods: Dict[str, List[common.InstanceInfo]] = {}
head_pod_name = None

Expand Down Expand Up @@ -858,7 +820,7 @@ def get_command_runners(
"""Get a command runner for the given cluster."""
assert cluster_info.provider_config is not None, cluster_info
instances = cluster_info.instances
namespace = _get_namespace(cluster_info.provider_config)
namespace = kubernetes_utils.get_namespace(cluster_info.provider_config)
node_list = []
if cluster_info.head_instance_id is not None:
node_list = [(namespace, cluster_info.head_instance_id)]
Expand Down
133 changes: 133 additions & 0 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
import re
import shutil
import subprocess
import typing
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse

import jinja2
from kubernetes.client import V1Pod
import yaml

import sky
from sky import exceptions
from sky import sky_logging
from sky import skypilot_config
from sky.adaptors import kubernetes
from sky.provision import constants as provision_constants
from sky.provision.kubernetes import network_utils
from sky.skylet import constants
from sky.utils import common_utils
Expand All @@ -25,6 +28,9 @@
from sky.utils import schemas
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
from sky import backends

# TODO(romilb): Move constants to constants.py
DEFAULT_NAMESPACE = 'default'

Expand Down Expand Up @@ -60,6 +66,16 @@
PORT_FORWARD_PROXY_CMD_TEMPLATE = 'kubernetes-port-forward-proxy-command.sh'
PORT_FORWARD_PROXY_CMD_PATH = '~/.sky/kubernetes-port-forward-proxy-command.sh'

POD_STATUSES = {
'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating'
}
AUTODOWN_ANNOTATIONS_KEY = 'skypilot.co/autodown'
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved
IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY = (
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved
'skypilot.co/idle_minutes_to_autostop')
KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS = [
AUTODOWN_ANNOTATIONS_KEY, IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY
]

logger = sky_logging.init_logger(__name__)


Expand Down Expand Up @@ -1722,3 +1738,120 @@ def get_kubernetes_node_info() -> Dict[str, KubernetesNodeInfo]:
free={'nvidia.com/gpu': int(accelerators_available)})

return node_info_dict


def to_label_selector(tags):
label_selector = ''
for k, v in tags.items():
if label_selector != '':
label_selector += ','
label_selector += '{}={}'.format(k, v)
return label_selector


def get_namespace(provider_config: Dict[str, Any]) -> str:
return provider_config.get('namespace',
get_current_kube_config_context_namespace())


def filter_pods(namespace: str,
tag_filters: Dict[str, str],
status_filters: Optional[List[str]] = None) -> Dict[str, Any]:
"""Filters pods by tags and status."""
non_included_pod_statuses = POD_STATUSES.copy()

field_selector = ''
if status_filters is not None:
non_included_pod_statuses -= set(status_filters)
field_selector = ','.join(
[f'status.phase!={status}' for status in non_included_pod_statuses])

label_selector = to_label_selector(tag_filters)
pod_list = kubernetes.core_api().list_namespaced_pod(
namespace, field_selector=field_selector, label_selector=label_selector)

# Don't return pods marked for deletion,
# i.e. pods with non-null metadata.DeletionTimestamp.
pods = [
pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None
]
return {pod.metadata.name: pod for pod in pods}


def _remove_pod_annotations(pod: V1Pod, annotation_key: str,
namespace: str) -> None:
"""Removes specified Annotation from a Kubernetes pod."""
try:
# Remove the specified annotations
if pod.metadata.annotations:
if annotation_key in pod.metadata.annotations:
# Patch the pod with the updated metadata
body = {'metadata': {'annotations': {annotation_key: None}}}
kubernetes.core_api().patch_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
body=body,
_request_timeout=kubernetes.API_TIMEOUT)

except kubernetes.api_exception() as e:
if e.status == 404:
logger.warning(f'Pod {pod.metadata.name} not found in namespace '
f'{namespace} while trying to remove Annotations '
f'{annotation_key}.')
else:
raise


def _add_pod_annotations(pod: V1Pod, annotation: Dict[str, str],
namespace: str) -> None:
"""Adds specified Annotations on a Kubernetes pod."""
try:
# Patch the pod with the updated metadata
body = {'metadata': {'annotations': annotation}}
kubernetes.core_api().patch_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
body=body,
_request_timeout=kubernetes.API_TIMEOUT)

except kubernetes.api_exception() as e:
if e.status == 404:
logger.warning(f'Pod {pod.metadata.name} not found in namespace '
f'{namespace} while trying to remove Annotations '
f'{annotation}.')
else:
raise


def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle',
idle_minutes_to_autostop: Optional[int],
down: bool = False) -> None:
"""Adds or removes Annotations of autodown on Kubernetes pods. """
annotation_keys = KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS
tags = {
provision_constants.TAG_RAY_CLUSTER_NAME: handle.cluster_name_on_cloud,
}
ray_config = common_utils.read_yaml(handle.cluster_yaml)
provider_config = ray_config['provider']
namespace = get_namespace(provider_config)
# Pods are already in 'Running' status
running_pods = filter_pods(namespace, tags)
for _, pod in running_pods.items():
if down:
for annotation_key in annotation_keys:
annotation: Dict[str, str] = {}
if annotation_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY:
annotation.update(
{annotation_key: str(idle_minutes_to_autostop)})
else: # annotation_key == AUTODOWN_ANNOTATIONS_KEY
annotation.update({annotation_key: str(True)})
_add_pod_annotations(pod=pod,
annotation=annotation,
namespace=namespace)

elif (idle_minutes_to_autostop is not None and idle_minutes_to_autostop
== constants.IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL):
for annotation_key in annotation_keys:
_remove_pod_annotations(pod=pod,
annotation_key=annotation_key,
namespace=namespace)
4 changes: 3 additions & 1 deletion sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@
# controller and sky serve controller.
# TODO(tian): Refactor to controller_utils. Current blocker: circular import.
CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP = 10

# idle_minutes_to_autostop value used to indicate cancellation on registered
# autostop/down.
IDLE_MINUTES_TO_AUTOSTOP_FOR_CANCEL = -1
# Due to the CPU/memory usage of the controller process launched with sky jobs (
# use ray job under the hood), we need to reserve some CPU/memory for each jobs/
# serve controller process.
Expand Down
Loading