Skip to content

Commit

Permalink
[k8s] Add cluster attributes(autodown, idle-minutes-to-autostop) as a…
Browse files Browse the repository at this point in the history
…nnotations to the pod (#3870)

* add autodown annotations to the k8s pod

* revert kubernetes ray template

* revert backend_utils from invasive approach

* nit

* revert from invasive approaches

* revert

* updated approach

* nit

* nit

* Use constant to represent idle_minutes_to_autostop for cancellation

* revert using constants for cancel

* nit

* nit

* add smoke tests

* Update sky/provision/kubernetes/utils.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* fix comments

* nit

* remove loops and annotate one by one

* format

* update with autodown annotation with context

* format

---------

Co-authored-by: Romil Bhardwaj <[email protected]>
  • Loading branch information
landscapepainter and romilbhardwaj authored Sep 26, 2024
1 parent 396e0fc commit d4f96e6
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 46 deletions.
8 changes: 8 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from sky.provision import instance_setup
from sky.provision import metadata_utils
from sky.provision import provisioner
from sky.provision.kubernetes import utils as kubernetes_utils
from sky.skylet import autostop_lib
from sky.skylet import constants
from sky.skylet import job_lib
Expand Down Expand Up @@ -4180,6 +4181,13 @@ def set_autostop(self,
global_user_state.set_cluster_autostop_value(
handle.cluster_name, idle_minutes_to_autostop, down)

# Add/Remove autodown annotations to/from Kubernetes pods.
if isinstance(handle.launched_resources.cloud, clouds.Kubernetes):
kubernetes_utils.set_autodown_annotations(
handle=handle,
idle_minutes_to_autostop=idle_minutes_to_autostop,
down=down)

def is_definitely_autostopping(self,
handle: CloudVmRayResourceHandle,
stream_logs: bool = True) -> bool:
Expand Down
61 changes: 15 additions & 46 deletions sky/provision/kubernetes/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,6 @@
TAG_SKYPILOT_CLUSTER_NAME = 'skypilot-cluster-name'
TAG_POD_INITIALIZED = 'skypilot-initialized'

POD_STATUSES = {
'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating'
}


def to_label_selector(tags):
label_selector = ''
for k, v in tags.items():
if label_selector != '':
label_selector += ','
label_selector += '{}={}'.format(k, v)
return label_selector


def _filter_pods(namespace: str, context: str, tag_filters: Dict[str, str],
status_filters: Optional[List[str]]) -> Dict[str, Any]:
"""Filters pods by tags and status."""
non_included_pod_statuses = POD_STATUSES.copy()

field_selector = ''
if status_filters is not None:
non_included_pod_statuses -= set(status_filters)
field_selector = ','.join(
[f'status.phase!={status}' for status in non_included_pod_statuses])

label_selector = to_label_selector(tag_filters)
pod_list = kubernetes.core_api(context).list_namespaced_pod(
namespace, field_selector=field_selector, label_selector=label_selector)

# Don't return pods marked for deletion,
# i.e. pods with non-null metadata.DeletionTimestamp.
pods = [
pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None
]
return {pod.metadata.name: pod for pod in pods}


def _get_head_pod_name(pods: Dict[str, Any]) -> Optional[str]:
head_pod_name = None
Expand Down Expand Up @@ -475,16 +439,17 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
pod_spec['metadata']['labels'].update(
{TAG_SKYPILOT_CLUSTER_NAME: cluster_name_on_cloud})

terminating_pods = _filter_pods(namespace, context, tags, ['Terminating'])
terminating_pods = kubernetes_utils.filter_pods(namespace, context, tags,
['Terminating'])
start_time = time.time()
while (len(terminating_pods) > 0 and
time.time() - start_time < _TIMEOUT_FOR_POD_TERMINATION):
logger.debug(f'run_instances: Found {len(terminating_pods)} '
'terminating pods. Waiting them to finish: '
f'{list(terminating_pods.keys())}')
time.sleep(POLL_INTERVAL)
terminating_pods = _filter_pods(namespace, context, tags,
['Terminating'])
terminating_pods = kubernetes_utils.filter_pods(namespace, context,
tags, ['Terminating'])

if len(terminating_pods) > 0:
# If there are still terminating pods, we force delete them.
Expand All @@ -501,8 +466,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
_request_timeout=config_lib.DELETION_TIMEOUT,
grace_period_seconds=0)

running_pods = _filter_pods(namespace, context, tags,
['Pending', 'Running'])
running_pods = kubernetes_utils.filter_pods(namespace, context, tags,
['Pending', 'Running'])
head_pod_name = _get_head_pod_name(running_pods)
logger.debug(f'Found {len(running_pods)} existing pods: '
f'{list(running_pods.keys())}')
Expand Down Expand Up @@ -583,7 +548,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
if head_pod_name is None:
head_pod_name = pod.metadata.name

wait_pods_dict = _filter_pods(namespace, context, tags, ['Pending'])
wait_pods_dict = kubernetes_utils.filter_pods(namespace, context, tags,
['Pending'])
wait_pods = list(wait_pods_dict.values())

networking_mode = network_utils.get_networking_mode(
Expand Down Expand Up @@ -613,8 +579,9 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
logger.debug(f'run_instances: all pods are scheduled and running: '
f'{list(wait_pods_dict.keys())}')

running_pods = _filter_pods(namespace, context, tags, ['Running'])
initialized_pods = _filter_pods(namespace, context, {
running_pods = kubernetes_utils.filter_pods(namespace, context, tags,
['Running'])
initialized_pods = kubernetes_utils.filter_pods(namespace, context, {
TAG_POD_INITIALIZED: 'true',
**tags
}, ['Running'])
Expand Down Expand Up @@ -722,7 +689,7 @@ def terminate_instances(
tag_filters = {
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
}
pods = _filter_pods(namespace, context, tag_filters, None)
pods = kubernetes_utils.filter_pods(namespace, context, tag_filters, None)

def _is_head(pod) -> bool:
return pod.metadata.labels[constants.TAG_RAY_NODE_KIND] == 'head'
Expand All @@ -746,7 +713,9 @@ def get_cluster_info(
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
}

running_pods = _filter_pods(namespace, context, tag_filters, ['Running'])
running_pods = kubernetes_utils.filter_pods(namespace, context, tag_filters,
['Running'])

pods: Dict[str, List[common.InstanceInfo]] = {}
head_pod_name = None

Expand Down
143 changes: 143 additions & 0 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import shutil
import subprocess
import typing
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse

Expand All @@ -17,6 +18,7 @@
from sky import sky_logging
from sky import skypilot_config
from sky.adaptors import kubernetes
from sky.provision import constants as provision_constants
from sky.provision.kubernetes import network_utils
from sky.skylet import constants
from sky.utils import common_utils
Expand All @@ -25,6 +27,9 @@
from sky.utils import schemas
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
from sky import backends

# TODO(romilb): Move constants to constants.py
DEFAULT_NAMESPACE = 'default'

Expand Down Expand Up @@ -64,6 +69,16 @@
PORT_FORWARD_PROXY_CMD_PATH = ('~/.sky/kubernetes-port-forward-proxy-command-'
f'v{PORT_FORWARD_PROXY_CMD_VERSION}.sh')

POD_STATUSES = {
'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating'
}
AUTODOWN_ANNOTATION_KEY = 'skypilot.co/autodown'
IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY = (
'skypilot.co/idle_minutes_to_autostop')
ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG = ('Pod {pod_name} not found in namespace '
'{namespace} while trying to {action} '
'an annotation {annotation}.')

logger = sky_logging.init_logger(__name__)


Expand Down Expand Up @@ -1748,11 +1763,139 @@ def get_kubernetes_node_info() -> Dict[str, KubernetesNodeInfo]:
return node_info_dict


def to_label_selector(tags):
label_selector = ''
for k, v in tags.items():
if label_selector != '':
label_selector += ','
label_selector += '{}={}'.format(k, v)
return label_selector


def get_namespace_from_config(provider_config: Dict[str, Any]) -> str:
return provider_config.get('namespace',
get_current_kube_config_context_namespace())


def filter_pods(namespace: str,
context: str,
tag_filters: Dict[str, str],
status_filters: Optional[List[str]] = None) -> Dict[str, Any]:
"""Filters pods by tags and status."""
non_included_pod_statuses = POD_STATUSES.copy()

field_selector = ''
if status_filters is not None:
non_included_pod_statuses -= set(status_filters)
field_selector = ','.join(
[f'status.phase!={status}' for status in non_included_pod_statuses])

label_selector = to_label_selector(tag_filters)
pod_list = kubernetes.core_api(context).list_namespaced_pod(
namespace, field_selector=field_selector, label_selector=label_selector)

# Don't return pods marked for deletion,
# i.e. pods with non-null metadata.DeletionTimestamp.
pods = [
pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None
]
return {pod.metadata.name: pod for pod in pods}


def _remove_pod_annotation(pod: Any, annotation_key: str,
namespace: str) -> None:
"""Removes specified Annotations from a Kubernetes pod."""
try:
# Remove the specified annotation
if pod.metadata.annotations:
if annotation_key in pod.metadata.annotations:
# Patch the pod with the updated metadata.
body = {'metadata': {'annotations': {annotation_key: None}}}
kubernetes.core_api().patch_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
body=body,
_request_timeout=kubernetes.API_TIMEOUT)

except kubernetes.api_exception() as e:
if e.status == 404:
logger.warning(
ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG.format(
pod_name=pod.metadata.name,
namespace=namespace,
action='remove',
annotation=annotation_key))
else:
with ux_utils.print_exception_no_traceback():
raise


def _add_pod_annotation(pod: Any, annotation: Dict[str, str],
namespace: str) -> None:
"""Adds specified Annotations on a Kubernetes pod."""
try:
# Patch the pod with the updated metadata
body = {'metadata': {'annotations': annotation}}
kubernetes.core_api().patch_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
body=body,
_request_timeout=kubernetes.API_TIMEOUT)

except kubernetes.api_exception() as e:
if e.status == 404:
logger.warning(
ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG.format(
pod_name=pod.metadata.name,
namespace=namespace,
action='add',
annotation=annotation))
else:
with ux_utils.print_exception_no_traceback():
raise


def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle',
idle_minutes_to_autostop: Optional[int],
down: bool = False) -> None:
"""Adds or removes Annotations of autodown on Kubernetes pods."""
tags = {
provision_constants.TAG_RAY_CLUSTER_NAME: handle.cluster_name_on_cloud,
}
ray_config = common_utils.read_yaml(handle.cluster_yaml)
provider_config = ray_config['provider']
namespace = get_namespace_from_config(provider_config)
context = get_context_from_config(provider_config)
running_pods = filter_pods(namespace, context, tags)

for _, pod in running_pods.items():
if down:
idle_minutes_to_autostop_annotation = {
IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY:
str(idle_minutes_to_autostop)
}
autodown_annotation = {AUTODOWN_ANNOTATION_KEY: 'true'}
_add_pod_annotation(pod=pod,
annotation=idle_minutes_to_autostop_annotation,
namespace=namespace)
_add_pod_annotation(pod=pod,
annotation=autodown_annotation,
namespace=namespace)

# If idle_minutes_to_autostop is negative, it indicates a request to
# cancel autostop using the --cancel flag with the `sky autostop`
# command.
elif (idle_minutes_to_autostop is not None and
idle_minutes_to_autostop < 0):
_remove_pod_annotation(
pod=pod,
annotation_key=IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY,
namespace=namespace)
_remove_pod_annotation(pod=pod,
annotation_key=AUTODOWN_ANNOTATION_KEY,
namespace=namespace)


def get_context_from_config(provider_config: Dict[str, Any]) -> str:
return provider_config.get('context',
get_current_kube_config_context_name())
58 changes: 58 additions & 0 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2121,6 +2121,64 @@ def test_task_labels_kubernetes():
run_one_test(test)


# ---------- Pod Annotations on Kubernetes ----------
@pytest.mark.kubernetes
def test_add_pod_annotations_for_autodown_with_launch():
name = _get_cluster_name()
test = Test(
'add_pod_annotations_for_autodown_with_launch',
[
# Launch Kubernetes cluster with two nodes, each being head node and worker node.
# Autodown is set.
f'sky launch -y -c {name} -i 10 --down --num-nodes 2 --cpus=1 --cloud kubernetes',
# Get names of the pods containing cluster name.
f'pod_1=$(kubectl get pods -o name | grep {name} | sed -n 1p)',
f'pod_2=$(kubectl get pods -o name | grep {name} | sed -n 2p)',
# Describe the first pod and check for annotations.
'kubectl describe pod $pod_1 | grep -q skypilot.co/autodown',
'kubectl describe pod $pod_1 | grep -q skypilot.co/idle_minutes_to_autostop',
# Describe the second pod and check for annotations.
'kubectl describe pod $pod_2 | grep -q skypilot.co/autodown',
'kubectl describe pod $pod_2 | grep -q skypilot.co/idle_minutes_to_autostop'
],
f'sky down -y {name}',
)
run_one_test(test)


@pytest.mark.kubernetes
def test_add_and_remove_pod_annotations_with_autostop():
name = _get_cluster_name()
test = Test(
'add_and_remove_pod_annotations_with_autostop',
[
# Launch Kubernetes cluster with two nodes, each being head node and worker node.
f'sky launch -y -c {name} --num-nodes 2 --cpus=1 --cloud kubernetes',
# Set autodown on the cluster with 'autostop' command.
f'sky autostop -y {name} -i 20 --down',
# Get names of the pods containing cluster name.
f'pod_1=$(kubectl get pods -o name | grep {name} | sed -n 1p)',
f'pod_2=$(kubectl get pods -o name | grep {name} | sed -n 2p)',
# Describe the first pod and check for annotations.
'kubectl describe pod $pod_1 | grep -q skypilot.co/autodown',
'kubectl describe pod $pod_1 | grep -q skypilot.co/idle_minutes_to_autostop',
# Describe the second pod and check for annotations.
'kubectl describe pod $pod_2 | grep -q skypilot.co/autodown',
'kubectl describe pod $pod_2 | grep -q skypilot.co/idle_minutes_to_autostop',
# Cancel the set autodown to remove the annotations from the pods.
f'sky autostop -y {name} --cancel',
# Describe the first pod and check if annotations are removed.
'! kubectl describe pod $pod_1 | grep -q skypilot.co/autodown',
'! kubectl describe pod $pod_1 | grep -q skypilot.co/idle_minutes_to_autostop',
# Describe the second pod and check if annotations are removed.
'! kubectl describe pod $pod_2 | grep -q skypilot.co/autodown',
'! kubectl describe pod $pod_2 | grep -q skypilot.co/idle_minutes_to_autostop',
],
f'sky down -y {name}',
)
run_one_test(test)


# ---------- Container logs from task on Kubernetes ----------
@pytest.mark.kubernetes
def test_container_logs_multinode_kubernetes():
Expand Down

0 comments on commit d4f96e6

Please sign in to comment.