Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] Add cluster attributes(autodown, idle-minutes-to-autostop) as annotations to the pod #3870

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from sky.provision import instance_setup
from sky.provision import metadata_utils
from sky.provision import provisioner
from sky.provision.kubernetes import utils as kubernetes_utils
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we quickly check if this global import does not break skypilot if kubernetes dependencies are not installed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested by running sky launch --cloud gcp -y from a new env where only pip install -e ".[gcp]" is set, and there was no issue raised.

from sky.skylet import autostop_lib
from sky.skylet import constants
from sky.skylet import job_lib
Expand Down Expand Up @@ -4170,6 +4171,13 @@ def set_autostop(self,
global_user_state.set_cluster_autostop_value(
handle.cluster_name, idle_minutes_to_autostop, down)

# Add/Remove autodown annotations to/from Kubernetes pods.
if isinstance(handle.launched_resources.cloud, clouds.Kubernetes):
kubernetes_utils.set_autodown_annotations(
handle=handle,
idle_minutes_to_autostop=idle_minutes_to_autostop,
down=down)

def is_definitely_autostopping(self,
handle: CloudVmRayResourceHandle,
stream_logs: bool = True) -> bool:
Expand Down
61 changes: 15 additions & 46 deletions sky/provision/kubernetes/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,6 @@
TAG_SKYPILOT_CLUSTER_NAME = 'skypilot-cluster-name'
TAG_POD_INITIALIZED = 'skypilot-initialized'

POD_STATUSES = {
'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating'
}


def to_label_selector(tags):
label_selector = ''
for k, v in tags.items():
if label_selector != '':
label_selector += ','
label_selector += '{}={}'.format(k, v)
return label_selector


def _filter_pods(namespace: str, context: str, tag_filters: Dict[str, str],
status_filters: Optional[List[str]]) -> Dict[str, Any]:
"""Filters pods by tags and status."""
non_included_pod_statuses = POD_STATUSES.copy()

field_selector = ''
if status_filters is not None:
non_included_pod_statuses -= set(status_filters)
field_selector = ','.join(
[f'status.phase!={status}' for status in non_included_pod_statuses])

label_selector = to_label_selector(tag_filters)
pod_list = kubernetes.core_api(context).list_namespaced_pod(
namespace, field_selector=field_selector, label_selector=label_selector)

# Don't return pods marked for deletion,
# i.e. pods with non-null metadata.DeletionTimestamp.
pods = [
pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None
]
return {pod.metadata.name: pod for pod in pods}


def _get_head_pod_name(pods: Dict[str, Any]) -> Optional[str]:
head_pod_name = None
Expand Down Expand Up @@ -475,16 +439,17 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
pod_spec['metadata']['labels'].update(
{TAG_SKYPILOT_CLUSTER_NAME: cluster_name_on_cloud})

terminating_pods = _filter_pods(namespace, context, tags, ['Terminating'])
terminating_pods = kubernetes_utils.filter_pods(namespace, context, tags,
['Terminating'])
start_time = time.time()
while (len(terminating_pods) > 0 and
time.time() - start_time < _TIMEOUT_FOR_POD_TERMINATION):
logger.debug(f'run_instances: Found {len(terminating_pods)} '
'terminating pods. Waiting them to finish: '
f'{list(terminating_pods.keys())}')
time.sleep(POLL_INTERVAL)
terminating_pods = _filter_pods(namespace, context, tags,
['Terminating'])
terminating_pods = kubernetes_utils.filter_pods(namespace, context,
tags, ['Terminating'])

if len(terminating_pods) > 0:
# If there are still terminating pods, we force delete them.
Expand All @@ -501,8 +466,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
_request_timeout=config_lib.DELETION_TIMEOUT,
grace_period_seconds=0)

running_pods = _filter_pods(namespace, context, tags,
['Pending', 'Running'])
running_pods = kubernetes_utils.filter_pods(namespace, context, tags,
['Pending', 'Running'])
head_pod_name = _get_head_pod_name(running_pods)
logger.debug(f'Found {len(running_pods)} existing pods: '
f'{list(running_pods.keys())}')
Expand Down Expand Up @@ -583,7 +548,8 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
if head_pod_name is None:
head_pod_name = pod.metadata.name

wait_pods_dict = _filter_pods(namespace, context, tags, ['Pending'])
wait_pods_dict = kubernetes_utils.filter_pods(namespace, context, tags,
['Pending'])
wait_pods = list(wait_pods_dict.values())

networking_mode = network_utils.get_networking_mode(
Expand Down Expand Up @@ -613,8 +579,9 @@ def _create_pods(region: str, cluster_name_on_cloud: str,
logger.debug(f'run_instances: all pods are scheduled and running: '
f'{list(wait_pods_dict.keys())}')

running_pods = _filter_pods(namespace, context, tags, ['Running'])
initialized_pods = _filter_pods(namespace, context, {
running_pods = kubernetes_utils.filter_pods(namespace, context, tags,
['Running'])
initialized_pods = kubernetes_utils.filter_pods(namespace, context, {
TAG_POD_INITIALIZED: 'true',
**tags
}, ['Running'])
Expand Down Expand Up @@ -722,7 +689,7 @@ def terminate_instances(
tag_filters = {
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
}
pods = _filter_pods(namespace, context, tag_filters, None)
pods = kubernetes_utils.filter_pods(namespace, context, tag_filters, None)

def _is_head(pod) -> bool:
return pod.metadata.labels[constants.TAG_RAY_NODE_KIND] == 'head'
Expand All @@ -746,7 +713,9 @@ def get_cluster_info(
TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud,
}

running_pods = _filter_pods(namespace, context, tag_filters, ['Running'])
running_pods = kubernetes_utils.filter_pods(namespace, context, tag_filters,
['Running'])

pods: Dict[str, List[common.InstanceInfo]] = {}
head_pod_name = None

Expand Down
143 changes: 143 additions & 0 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import shutil
import subprocess
import typing
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse

Expand All @@ -17,6 +18,7 @@
from sky import sky_logging
from sky import skypilot_config
from sky.adaptors import kubernetes
from sky.provision import constants as provision_constants
from sky.provision.kubernetes import network_utils
from sky.skylet import constants
from sky.utils import common_utils
Expand All @@ -25,6 +27,9 @@
from sky.utils import schemas
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
from sky import backends

# TODO(romilb): Move constants to constants.py
DEFAULT_NAMESPACE = 'default'

Expand Down Expand Up @@ -64,6 +69,16 @@
PORT_FORWARD_PROXY_CMD_PATH = ('~/.sky/kubernetes-port-forward-proxy-command-'
f'v{PORT_FORWARD_PROXY_CMD_VERSION}.sh')

POD_STATUSES = {
'Pending', 'Running', 'Succeeded', 'Failed', 'Unknown', 'Terminating'
}
AUTODOWN_ANNOTATION_KEY = 'skypilot.co/autodown'
IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY = (
'skypilot.co/idle_minutes_to_autostop')
ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG = ('Pod {pod_name} not found in namespace '
'{namespace} while trying to {action} '
'an annotation {annotation}.')

logger = sky_logging.init_logger(__name__)


Expand Down Expand Up @@ -1748,11 +1763,139 @@ def get_kubernetes_node_info() -> Dict[str, KubernetesNodeInfo]:
return node_info_dict


def to_label_selector(tags):
label_selector = ''
for k, v in tags.items():
if label_selector != '':
label_selector += ','
label_selector += '{}={}'.format(k, v)
return label_selector


def get_namespace_from_config(provider_config: Dict[str, Any]) -> str:
return provider_config.get('namespace',
get_current_kube_config_context_namespace())


def filter_pods(namespace: str,
context: str,
tag_filters: Dict[str, str],
status_filters: Optional[List[str]] = None) -> Dict[str, Any]:
"""Filters pods by tags and status."""
non_included_pod_statuses = POD_STATUSES.copy()

field_selector = ''
if status_filters is not None:
non_included_pod_statuses -= set(status_filters)
field_selector = ','.join(
[f'status.phase!={status}' for status in non_included_pod_statuses])

label_selector = to_label_selector(tag_filters)
pod_list = kubernetes.core_api(context).list_namespaced_pod(
namespace, field_selector=field_selector, label_selector=label_selector)

# Don't return pods marked for deletion,
# i.e. pods with non-null metadata.DeletionTimestamp.
pods = [
pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None
]
return {pod.metadata.name: pod for pod in pods}


def _remove_pod_annotation(pod: Any, annotation_key: str,
namespace: str) -> None:
"""Removes specified Annotations from a Kubernetes pod."""
try:
# Remove the specified annotation
if pod.metadata.annotations:
if annotation_key in pod.metadata.annotations:
# Patch the pod with the updated metadata.
body = {'metadata': {'annotations': {annotation_key: None}}}
kubernetes.core_api().patch_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
body=body,
_request_timeout=kubernetes.API_TIMEOUT)

except kubernetes.api_exception() as e:
if e.status == 404:
logger.warning(
ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG.format(
pod_name=pod.metadata.name,
namespace=namespace,
action='remove',
annotation=annotation_key))
else:
with ux_utils.print_exception_no_traceback():
raise


def _add_pod_annotation(pod: Any, annotation: Dict[str, str],
namespace: str) -> None:
"""Adds specified Annotations on a Kubernetes pod."""
try:
# Patch the pod with the updated metadata
body = {'metadata': {'annotations': annotation}}
kubernetes.core_api().patch_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
body=body,
_request_timeout=kubernetes.API_TIMEOUT)

except kubernetes.api_exception() as e:
if e.status == 404:
logger.warning(
ANNOTATIONS_POD_NOT_FOUND_ERROR_MSG.format(
pod_name=pod.metadata.name,
namespace=namespace,
action='add',
annotation=annotation))
else:
with ux_utils.print_exception_no_traceback():
raise


def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle',
idle_minutes_to_autostop: Optional[int],
down: bool = False) -> None:
"""Adds or removes Annotations of autodown on Kubernetes pods."""
tags = {
provision_constants.TAG_RAY_CLUSTER_NAME: handle.cluster_name_on_cloud,
}
ray_config = common_utils.read_yaml(handle.cluster_yaml)
provider_config = ray_config['provider']
namespace = get_namespace_from_config(provider_config)
context = get_context_from_config(provider_config)
running_pods = filter_pods(namespace, context, tags)

for _, pod in running_pods.items():
if down:
idle_minutes_to_autostop_annotation = {
IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY:
str(idle_minutes_to_autostop)
}
autodown_annotation = {AUTODOWN_ANNOTATION_KEY: 'true'}
_add_pod_annotation(pod=pod,
annotation=idle_minutes_to_autostop_annotation,
namespace=namespace)
_add_pod_annotation(pod=pod,
annotation=autodown_annotation,
namespace=namespace)

# If idle_minutes_to_autostop is negative, it indicates a request to
# cancel autostop using the --cancel flag with the `sky autostop`
# command.
elif (idle_minutes_to_autostop is not None and
idle_minutes_to_autostop < 0):
_remove_pod_annotation(
pod=pod,
annotation_key=IDLE_MINUTES_TO_AUTOSTOP_ANNOTATION_KEY,
namespace=namespace)
_remove_pod_annotation(pod=pod,
annotation_key=AUTODOWN_ANNOTATION_KEY,
namespace=namespace)


def get_context_from_config(provider_config: Dict[str, Any]) -> str:
return provider_config.get('context',
get_current_kube_config_context_name())
58 changes: 58 additions & 0 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2121,6 +2121,64 @@ def test_task_labels_kubernetes():
run_one_test(test)


# ---------- Pod Annotations on Kubernetes ----------
@pytest.mark.kubernetes
def test_add_pod_annotations_for_autodown_with_launch():
name = _get_cluster_name()
test = Test(
'add_pod_annotations_for_autodown_with_launch',
[
# Launch Kubernetes cluster with two nodes, each being head node and worker node.
# Autodown is set.
f'sky launch -y -c {name} -i 10 --down --num-nodes 2 --cpus=1 --cloud kubernetes',
# Get names of the pods containing cluster name.
f'pod_1=$(kubectl get pods -o name | grep {name} | sed -n 1p)',
f'pod_2=$(kubectl get pods -o name | grep {name} | sed -n 2p)',
# Describe the first pod and check for annotations.
'kubectl describe pod $pod_1 | grep -q skypilot.co/autodown',
'kubectl describe pod $pod_1 | grep -q skypilot.co/idle_minutes_to_autostop',
# Describe the second pod and check for annotations.
'kubectl describe pod $pod_2 | grep -q skypilot.co/autodown',
'kubectl describe pod $pod_2 | grep -q skypilot.co/idle_minutes_to_autostop'
],
f'sky down -y {name}',
)
run_one_test(test)


@pytest.mark.kubernetes
def test_add_and_remove_pod_annotations_with_autostop():
name = _get_cluster_name()
test = Test(
'add_and_remove_pod_annotations_with_autostop',
[
# Launch Kubernetes cluster with two nodes, each being head node and worker node.
f'sky launch -y -c {name} --num-nodes 2 --cpus=1 --cloud kubernetes',
# Set autodown on the cluster with 'autostop' command.
f'sky autostop -y {name} -i 20 --down',
# Get names of the pods containing cluster name.
f'pod_1=$(kubectl get pods -o name | grep {name} | sed -n 1p)',
f'pod_2=$(kubectl get pods -o name | grep {name} | sed -n 2p)',
# Describe the first pod and check for annotations.
'kubectl describe pod $pod_1 | grep -q skypilot.co/autodown',
'kubectl describe pod $pod_1 | grep -q skypilot.co/idle_minutes_to_autostop',
# Describe the second pod and check for annotations.
'kubectl describe pod $pod_2 | grep -q skypilot.co/autodown',
'kubectl describe pod $pod_2 | grep -q skypilot.co/idle_minutes_to_autostop',
# Cancel the set autodown to remove the annotations from the pods.
f'sky autostop -y {name} --cancel',
# Describe the first pod and check if annotations are removed.
'! kubectl describe pod $pod_1 | grep -q skypilot.co/autodown',
'! kubectl describe pod $pod_1 | grep -q skypilot.co/idle_minutes_to_autostop',
# Describe the second pod and check if annotations are removed.
'! kubectl describe pod $pod_2 | grep -q skypilot.co/autodown',
'! kubectl describe pod $pod_2 | grep -q skypilot.co/idle_minutes_to_autostop',
],
f'sky down -y {name}',
)
run_one_test(test)


# ---------- Container logs from task on Kubernetes ----------
@pytest.mark.kubernetes
def test_container_logs_multinode_kubernetes():
Expand Down
Loading