Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] Add cluster attributes(autodown, idle-minutes-to-autostop) as annotations to the pod #3870

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from sky.provision import instance_setup
from sky.provision import metadata_utils
from sky.provision import provisioner
from sky.provision.kubernetes import utils as kubernetes_utils
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we quickly check if this global import does not break skypilot if kubernetes dependencies are not installed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested by running sky launch --cloud gcp -y from a new env where only pip install -e ".[gcp]" is set, and there was no issue raised.

from sky.skylet import autostop_lib
from sky.skylet import constants
from sky.skylet import job_lib
Expand Down Expand Up @@ -4127,6 +4128,13 @@ def set_autostop(self,
global_user_state.set_cluster_autostop_value(
handle.cluster_name, idle_minutes_to_autostop, down)

# Add/Remove autodown annotations to/from Kubernetes pods.
if isinstance(handle.launched_resources.cloud, clouds.Kubernetes):
kubernetes_utils.set_autodown_annotations(
handle=handle,
idle_minutes_to_autostop=idle_minutes_to_autostop,
down=down)

def is_definitely_autostopping(self,
handle: CloudVmRayResourceHandle,
stream_logs: bool = True) -> bool:
Expand Down
70 changes: 16 additions & 54 deletions sky/provision/kubernetes/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,6 @@
TAG_SKYPILOT_CLUSTER_NAME = 'skypilot-cluster-name'
TAG_POD_INITIALIZED = 'skypilot-initialized'

POD_STATUSES = {
'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating'
}


def to_label_selector(tags):
label_selector = ''
for k, v in tags.items():
if label_selector != '':
label_selector += ','
label_selector += '{}={}'.format(k, v)
return label_selector


def _get_namespace(provider_config: Dict[str, Any]) -> str:
return provider_config.get(
'namespace',
kubernetes_utils.get_current_kube_config_context_namespace())


def _filter_pods(namespace: str, tag_filters: Dict[str, str],
status_filters: Optional[List[str]]) -> Dict[str, Any]:
"""Filters pods by tags and status."""
non_included_pod_statuses = POD_STATUSES.copy()

field_selector = ''
if status_filters is not None:
non_included_pod_statuses -= set(status_filters)
field_selector = ','.join(
[f'status.phase!={status}' for status in non_included_pod_statuses])

label_selector = to_label_selector(tag_filters)
pod_list = kubernetes.core_api().list_namespaced_pod(
namespace, field_selector=field_selector, label_selector=label_selector)

# Don't return pods marked for deletion,
# i.e. pods with non-null metadata.DeletionTimestamp.
pods = [
pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None
]
return {pod.metadata.name: pod for pod in pods}


def _get_head_pod_name(pods: Dict[str, Any]) -> Optional[str]:
head_pod_name = None
Expand Down Expand Up @@ -464,7 +422,7 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
config: common.ProvisionConfig) -> common.ProvisionRecord:
"""Create pods based on the config."""
provider_config = config.provider_config
namespace = _get_namespace(provider_config)
namespace = kubernetes_utils.get_namespace(provider_config)
pod_spec = copy.deepcopy(config.node_config)
tags = {
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
Expand All @@ -477,15 +435,17 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
pod_spec['metadata']['labels'].update(
{TAG_SKYPILOT_CLUSTER_NAME: cluster_name_on_cloud})

terminating_pods = _filter_pods(namespace, tags, ['Terminating'])
terminating_pods = kubernetes_utils.filter_pods(namespace, tags,
['Terminating'])
start_time = time.time()
while (len(terminating_pods) > 0 and
time.time() - start_time < _TIMEOUT_FOR_POD_TERMINATION):
logger.debug(f'run_instances: Found {len(terminating_pods)} '
'terminating pods. Waiting them to finish: '
f'{list(terminating_pods.keys())}')
time.sleep(POLL_INTERVAL)
terminating_pods = _filter_pods(namespace, tags, ['Terminating'])
terminating_pods = kubernetes_utils.filter_pods(namespace, tags,
['Terminating'])

if len(terminating_pods) > 0:
# If there are still terminating pods, we force delete them.
Expand All @@ -502,7 +462,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
_request_timeout=config_lib.DELETION_TIMEOUT,
grace_period_seconds=0)

running_pods = _filter_pods(namespace, tags, ['Pending', 'Running'])
running_pods = kubernetes_utils.filter_pods(namespace, tags,
['Pending', 'Running'])
head_pod_name = _get_head_pod_name(running_pods)
logger.debug(f'Found {len(running_pods)} existing pods: '
f'{list(running_pods.keys())}')
Expand Down Expand Up @@ -579,7 +540,7 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
if head_pod_name is None:
head_pod_name = pod.metadata.name

wait_pods_dict = _filter_pods(namespace, tags, ['Pending'])
wait_pods_dict = kubernetes_utils.filter_pods(namespace, tags, ['Pending'])
wait_pods = list(wait_pods_dict.values())

networking_mode = network_utils.get_networking_mode(
Expand Down Expand Up @@ -609,8 +570,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
logger.debug(f'run_instances: all pods are scheduled and running: '
f'{list(wait_pods_dict.keys())}')

running_pods = _filter_pods(namespace, tags, ['Running'])
initialized_pods = _filter_pods(namespace, {
running_pods = kubernetes_utils.filter_pods(namespace, tags, ['Running'])
initialized_pods = kubernetes_utils.filter_pods(namespace, {
TAG_POD_INITIALIZED: 'true',
**tags
}, ['Running'])
Expand Down Expand Up @@ -712,11 +673,11 @@ def terminate_instances(
worker_only: bool = False,
) -> None:
"""See sky/provision/__init__.py"""
namespace = _get_namespace(provider_config)
namespace = kubernetes_utils.get_namespace(provider_config)
tag_filters = {
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
}
pods = _filter_pods(namespace, tag_filters, None)
pods = kubernetes_utils.filter_pods(namespace, tag_filters, None)

def _is_head(pod) -> bool:
return pod.metadata.labels[constants.TAG_RAY_NODE_KIND] == 'head'
Expand All @@ -734,12 +695,13 @@ def get_cluster_info(
provider_config: Optional[Dict[str, Any]] = None) -> common.ClusterInfo:
del region # unused
assert provider_config is not None
namespace = _get_namespace(provider_config)
namespace = kubernetes_utils.get_namespace(provider_config)
tag_filters = {
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
}

running_pods = _filter_pods(namespace, tag_filters, ['Running'])
running_pods = kubernetes_utils.filter_pods(namespace, tag_filters,
['Running'])
pods: Dict[str, List[common.InstanceInfo]] = {}
head_pod_name = None

Expand Down Expand Up @@ -858,7 +820,7 @@ def get_command_runners(
"""Get a command runner for the given cluster."""
assert cluster_info.provider_config is not None, cluster_info
instances = cluster_info.instances
namespace = _get_namespace(cluster_info.provider_config)
namespace = kubernetes_utils.get_namespace(cluster_info.provider_config)
node_list = []
if cluster_info.head_instance_id is not None:
node_list = [(namespace, cluster_info.head_instance_id)]
Expand Down
148 changes: 148 additions & 0 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import shutil
import subprocess
import typing
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse

Expand All @@ -17,6 +18,7 @@
from sky import sky_logging
from sky import skypilot_config
from sky.adaptors import kubernetes
from sky.provision import constants as provision_constants
from sky.provision.kubernetes import network_utils
from sky.skylet import constants
from sky.utils import common_utils
Expand All @@ -25,6 +27,9 @@
from sky.utils import schemas
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
from sky import backends

# TODO(romilb): Move constants to constants.py
DEFAULT_NAMESPACE = 'default'

Expand Down Expand Up @@ -60,6 +65,21 @@
PORT_FORWARD_PROXY_CMD_TEMPLATE = 'kubernetes-port-forward-proxy-command.sh'
PORT_FORWARD_PROXY_CMD_PATH = '~/.sky/kubernetes-port-forward-proxy-command.sh'

POD_STATUSES = {
'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating'
}
AUTODOWN_ANNOTATIONS_KEY = 'skypilot.co/autodown'
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved
IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY = (
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved
'skypilot.co/idle_minutes_to_autostop')
KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS = [
AUTODOWN_ANNOTATIONS_KEY, IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY
]
ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG = ('Pod {pod_name} not found in namespace '
'{namespace} while trying to {action} '
'Annotations {annotations}.')

V1Pod = Any
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved

logger = sky_logging.init_logger(__name__)


Expand Down Expand Up @@ -1722,3 +1742,131 @@ def get_kubernetes_node_info() -> Dict[str, KubernetesNodeInfo]:
free={'nvidia.com/gpu': int(accelerators_available)})

return node_info_dict


def to_label_selector(tags):
label_selector = ''
for k, v in tags.items():
if label_selector != '':
label_selector += ','
label_selector += '{}={}'.format(k, v)
return label_selector


def get_namespace(provider_config: Dict[str, Any]) -> str:
return provider_config.get('namespace',
get_current_kube_config_context_namespace())


def filter_pods(namespace: str,
tag_filters: Dict[str, str],
status_filters: Optional[List[str]] = None) -> Dict[str, Any]:
"""Filters pods by tags and status."""
non_included_pod_statuses = POD_STATUSES.copy()

field_selector = ''
if status_filters is not None:
non_included_pod_statuses -= set(status_filters)
field_selector = ','.join(
[f'status.phase!={status}' for status in non_included_pod_statuses])

label_selector = to_label_selector(tag_filters)
pod_list = kubernetes.core_api().list_namespaced_pod(
namespace, field_selector=field_selector, label_selector=label_selector)

# Don't return pods marked for deletion,
# i.e. pods with non-null metadata.DeletionTimestamp.
pods = [
pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None
]
return {pod.metadata.name: pod for pod in pods}


def _remove_pod_annotations(pod: V1Pod, annotations_key: str,
namespace: str) -> None:
"""Removes specified Annotations from a Kubernetes pod."""
try:
# Remove the specified annotations
if pod.metadata.annotations:
if annotations_key in pod.metadata.annotations:
# Patch the pod with the updated metadata
body = {'metadata': {'annotations': {annotations_key: None}}}
kubernetes.core_api().patch_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
body=body,
_request_timeout=kubernetes.API_TIMEOUT)

except kubernetes.api_exception() as e:
if e.status == 404:
logger.warning(
ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG.format(
pod_name=pod.metadata.name,
namespace=namespace,
action='remove',
annotations=annotations_key))
else:
with ux_utils.print_exception_no_traceback():
raise


def _add_pod_annotations(pod: V1Pod, annotations: Dict[str, str],
namespace: str) -> None:
"""Adds specified Annotations on a Kubernetes pod."""
try:
# Patch the pod with the updated metadata
body = {'metadata': {'annotations': annotations}}
kubernetes.core_api().patch_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
body=body,
_request_timeout=kubernetes.API_TIMEOUT)

except kubernetes.api_exception() as e:
if e.status == 404:
logger.warning(
ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG.format(
pod_name=pod.metadata.name,
namespace=namespace,
action='add',
annotations=annotations))
else:
with ux_utils.print_exception_no_traceback():
raise


def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle',
idle_minutes_to_autostop: Optional[int],
down: bool = False) -> None:
"""Adds or removes Annotations of autodown on Kubernetes pods."""
annotations_keys = KUBERNETES_AUTODOWN_ANNOTATIONS_KEYS
tags = {
provision_constants.TAG_RAY_CLUSTER_NAME: handle.cluster_name_on_cloud,
}
ray_config = common_utils.read_yaml(handle.cluster_yaml)
provider_config = ray_config['provider']
namespace = get_namespace(provider_config)
running_pods = filter_pods(namespace, tags)

for _, pod in running_pods.items():
if down:
for annotations_key in annotations_keys:
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved
annotations: Dict[str, str] = {}
if annotations_key == IDLE_MINUTES_TO_AUTOSTOP_ANNOTATIONS_KEY:
annotations.update(
{annotations_key: str(idle_minutes_to_autostop)})
else: # annotation_key == AUTODOWN_ANNOTATIONS_KEY
annotations.update({annotations_key: str(True)})
landscapepainter marked this conversation as resolved.
Show resolved Hide resolved
_add_pod_annotations(pod=pod,
annotations=annotations,
namespace=namespace)

# If idle_minutes_to_autostop is negative, it indicates a request to
# cancel autostop using the --cancel flag with the `sky autostop`
# command.
elif (idle_minutes_to_autostop is not None and
idle_minutes_to_autostop < 0):
for annotations_key in annotations_keys:
_remove_pod_annotations(pod=pod,
annotations_key=annotations_key,
namespace=namespace)
Loading
Loading