Skip to content

Commit

Permalink
[k8s] Support in-cluster and kubeconfig auth simultaneously (#4188)
Browse files Browse the repository at this point in the history
* per-context SA + incluster auth fixes

* lint

* Support both incluster and kubeconfig

* wip

* Ignore kubeconfig when context is not specified, add su, mounting kubeconfig

* lint

* comments

* fix merge issues

* lint
  • Loading branch information
romilbhardwaj authored Nov 25, 2024
1 parent ef2233b commit 76e20b6
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def update_current_kubernetes_clusters_from_registry():
def get_allowed_contexts():
"""Mock implementation of getting allowed kubernetes contexts."""
from sky.provision.kubernetes import utils
contexts = utils.get_all_kube_config_context_names()
contexts = utils.get_all_kube_context_names()
return contexts[:2]


Expand Down
44 changes: 34 additions & 10 deletions sky/adaptors/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
# Timeout to use for API calls
API_TIMEOUT = 5

DEFAULT_IN_CLUSTER_REGION = 'in-cluster'
# The name for the environment variable that stores the in-cluster context name
# for Kubernetes clusters. This is used to associate a name with the current
# context when running with in-cluster auth. If not set, the context name is
# set to DEFAULT_IN_CLUSTER_REGION.
IN_CLUSTER_CONTEXT_NAME_ENV_VAR = 'SKYPILOT_IN_CLUSTER_CONTEXT_NAME'


def _decorate_methods(obj: Any, decorator: Callable, decoration_type: str):
for attr_name in dir(obj):
Expand Down Expand Up @@ -57,16 +64,8 @@ def wrapped(*args, **kwargs):

def _load_config(context: Optional[str] = None):
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
# Load in-cluster config if running in a pod
# Kubernetes set environment variables for service discovery do not
# show up in SkyPilot tasks. For now, we work around by using
# DNS name instead of environment variables.
# See issue: https://github.com/skypilot-org/skypilot/issues/2287
os.environ['KUBERNETES_SERVICE_HOST'] = 'kubernetes.default.svc'
os.environ['KUBERNETES_SERVICE_PORT'] = '443'
kubernetes.config.load_incluster_config()
except kubernetes.config.config_exception.ConfigException:

def _load_config_from_kubeconfig(context: Optional[str] = None):
try:
kubernetes.config.load_kube_config(context=context)
except kubernetes.config.config_exception.ConfigException as e:
Expand All @@ -90,6 +89,21 @@ def _load_config(context: Optional[str] = None):
with ux_utils.print_exception_no_traceback():
raise ValueError(err_str) from None

if context == in_cluster_context_name() or context is None:
try:
# Load in-cluster config if running in a pod and context is None.
# Kubernetes set environment variables for service discovery do not
# show up in SkyPilot tasks. For now, we work around by using
# DNS name instead of environment variables.
# See issue: https://github.com/skypilot-org/skypilot/issues/2287
os.environ['KUBERNETES_SERVICE_HOST'] = 'kubernetes.default.svc'
os.environ['KUBERNETES_SERVICE_PORT'] = '443'
kubernetes.config.load_incluster_config()
except kubernetes.config.config_exception.ConfigException:
_load_config_from_kubeconfig()
else:
_load_config_from_kubeconfig(context)


@_api_logging_decorator('urllib3', logging.ERROR)
@functools.lru_cache()
Expand Down Expand Up @@ -154,3 +168,13 @@ def max_retry_error():

def stream():
return kubernetes.stream.stream


def in_cluster_context_name() -> Optional[str]:
"""Returns the name of the in-cluster context from the environment.
If the environment variable is not set, returns the default in-cluster
context name.
"""
return (os.environ.get(IN_CLUSTER_CONTEXT_NAME_ENV_VAR) or
DEFAULT_IN_CLUSTER_REGION)
4 changes: 2 additions & 2 deletions sky/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]:
secret_field_name = clouds.Kubernetes().ssh_key_secret_field_name
context = config['provider'].get(
'context', kubernetes_utils.get_current_kube_config_context_name())
if context == kubernetes_utils.IN_CLUSTER_REGION:
# If the context is set to IN_CLUSTER_REGION, we are running in a pod
if context == kubernetes.in_cluster_context_name():
# If the context is an in-cluster context name, we are running in a pod
# with in-cluster configuration. We need to set the context to None
# to use the mounted service account.
context = None
Expand Down
7 changes: 6 additions & 1 deletion sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,12 @@ def write_cluster_config(
f'{skypilot_config.loaded_config_path!r} for {cloud}, but it '
'is not supported by this cloud. Remove the config or set: '
'`remote_identity: LOCAL_CREDENTIALS`.')
excluded_clouds.add(cloud)
if isinstance(cloud, clouds.Kubernetes):
if skypilot_config.get_nested(
('kubernetes', 'allowed_contexts'), None) is None:
excluded_clouds.add(cloud)
else:
excluded_clouds.add(cloud)

for cloud_str, cloud_obj in cloud_registry.CLOUD_REGISTRY.items():
remote_identity_config = skypilot_config.get_nested(
Expand Down
60 changes: 34 additions & 26 deletions sky/clouds/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,32 +130,30 @@ def _log_skipped_contexts_once(cls, skipped_contexts: Tuple[str,
'Ignoring these contexts.')

@classmethod
def _existing_allowed_contexts(cls) -> List[Optional[str]]:
def _existing_allowed_contexts(cls) -> List[str]:
"""Get existing allowed contexts.
If None is returned in the list, it means that we are running in a pod
with in-cluster auth. In this case, we specify None context, which will
use the service account mounted in the pod.
"""
all_contexts = kubernetes_utils.get_all_kube_config_context_names()
all_contexts = kubernetes_utils.get_all_kube_context_names()
if len(all_contexts) == 0:
return []
if all_contexts == [None]:
# If only one context is found and it is None, we are running in a
# pod with in-cluster auth. In this case, we allow it to be used
# without checking against allowed_contexts.
# TODO(romilb): We may want check in-cluster auth against
# allowed_contexts in the future by adding a special context name
# for in-cluster auth.
return [None]

all_contexts = set(all_contexts)

allowed_contexts = skypilot_config.get_nested(
('kubernetes', 'allowed_contexts'), None)

if allowed_contexts is None:
# Try kubeconfig if present
current_context = (
kubernetes_utils.get_current_kube_config_context_name())
if (current_context is None and
kubernetes_utils.is_incluster_config_available()):
# If no kubeconfig contexts found, use in-cluster if available
current_context = kubernetes.in_cluster_context_name()
allowed_contexts = []
if current_context is not None:
allowed_contexts = [current_context]
Expand All @@ -180,13 +178,7 @@ def regions_with_offering(cls, instance_type: Optional[str],

regions = []
for context in existing_contexts:
if context is None:
# If running in-cluster, we allow the region to be set to the
# singleton region since there is no context name available.
regions.append(clouds.Region(
kubernetes_utils.IN_CLUSTER_REGION))
else:
regions.append(clouds.Region(context))
regions.append(clouds.Region(context))

if region is not None:
regions = [r for r in regions if r.name == region]
Expand Down Expand Up @@ -409,20 +401,32 @@ def make_deploy_resources_variables(
remote_identity = skypilot_config.get_nested(
('kubernetes', 'remote_identity'),
schemas.get_default_remote_identity('kubernetes'))
if (remote_identity ==

if isinstance(remote_identity, dict):
# If remote_identity is a dict, use the service account for the
# current context
k8s_service_account_name = remote_identity.get(context, None)
if k8s_service_account_name is None:
err_msg = (f'Context {context!r} not found in '
'remote identities from config.yaml')
raise ValueError(err_msg)
else:
# If remote_identity is not a dict, use
k8s_service_account_name = remote_identity

if (k8s_service_account_name ==
schemas.RemoteIdentityOptions.LOCAL_CREDENTIALS.value):
# SA name doesn't matter since automounting credentials is disabled
k8s_service_account_name = 'default'
k8s_automount_sa_token = 'false'
elif (remote_identity ==
elif (k8s_service_account_name ==
schemas.RemoteIdentityOptions.SERVICE_ACCOUNT.value):
# Use the default service account
k8s_service_account_name = (
kubernetes_utils.DEFAULT_SERVICE_ACCOUNT_NAME)
k8s_automount_sa_token = 'true'
else:
# User specified a custom service account
k8s_service_account_name = remote_identity
k8s_automount_sa_token = 'true'

fuse_device_required = bool(resources.requires_fuse)
Expand All @@ -447,6 +451,12 @@ def make_deploy_resources_variables(
('kubernetes', 'provision_timeout'),
timeout,
override_configs=resources.cluster_config_overrides)

# Set environment variables for the pod. Note that SkyPilot env vars
# are set separately when the task is run. These env vars are
# independent of the SkyPilot task to be run.
k8s_env_vars = {kubernetes.IN_CLUSTER_CONTEXT_NAME_ENV_VAR: context}

# We specify object-store-memory to be 500MB to avoid taking up too
# much memory on the head node. 'num-cpus' should be set to limit
# the CPU usage on the head pod, otherwise the ray cluster will use the
Expand Down Expand Up @@ -480,6 +490,7 @@ def make_deploy_resources_variables(
'k8s_topology_label_key': k8s_topology_label_key,
'k8s_topology_label_value': k8s_topology_label_value,
'k8s_resource_key': k8s_resource_key,
'k8s_env_vars': k8s_env_vars,
'image_id': image_id,
'ray_installation_commands': constants.RAY_INSTALLATION_COMMANDS,
'ray_head_start_command': instance_setup.ray_head_start_command(
Expand Down Expand Up @@ -625,16 +636,13 @@ def validate_region_zone(self, region: Optional[str], zone: Optional[str]):
# TODO: Remove this after 0.9.0.
return region, zone

if region == kubernetes_utils.IN_CLUSTER_REGION:
if region == kubernetes.in_cluster_context_name():
# If running incluster, we set region to IN_CLUSTER_REGION
# since there is no context name available.
return region, zone

all_contexts = kubernetes_utils.get_all_kube_config_context_names()
if all_contexts == [None]:
# If [None] context is returned, use the singleton region since we
# are running in a pod with in-cluster auth.
all_contexts = [kubernetes_utils.IN_CLUSTER_REGION]
all_contexts = kubernetes_utils.get_all_kube_context_names()

if region not in all_contexts:
raise ValueError(
f'Context {region} not found in kubeconfig. Kubernetes only '
Expand Down
34 changes: 20 additions & 14 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

# TODO(romilb): Move constants to constants.py
DEFAULT_NAMESPACE = 'default'
IN_CLUSTER_REGION = 'in-cluster'

DEFAULT_SERVICE_ACCOUNT_NAME = 'skypilot-service-account'

Expand Down Expand Up @@ -921,6 +920,9 @@ def is_kubeconfig_exec_auth(
str: Error message if exec-based authentication is used, None otherwise
"""
k8s = kubernetes.kubernetes
if context == kubernetes.in_cluster_context_name():
# If in-cluster config is used, exec-based auth is not used.
return False, None
try:
k8s.config.load_kube_config()
except kubernetes.config_exception():
Expand Down Expand Up @@ -1003,30 +1005,34 @@ def is_incluster_config_available() -> bool:
return os.path.exists('/var/run/secrets/kubernetes.io/serviceaccount/token')


def get_all_kube_config_context_names() -> List[Optional[str]]:
"""Get all kubernetes context names from the kubeconfig file.
def get_all_kube_context_names() -> List[str]:
"""Get all kubernetes context names available in the environment.
Fetches context names from the kubeconfig file and in-cluster auth, if any.
If running in-cluster, returns [None] to indicate in-cluster config.
If running in-cluster and IN_CLUSTER_CONTEXT_NAME_ENV_VAR is not set,
returns the default in-cluster kubernetes context name.
We should not cache the result of this function as the admin policy may
update the contexts.
Returns:
List[Optional[str]]: The list of kubernetes context names if
available, an empty list otherwise. If running in-cluster,
returns [None] to indicate in-cluster config.
available, an empty list otherwise.
"""
k8s = kubernetes.kubernetes
context_names = []
try:
all_contexts, _ = k8s.config.list_kube_config_contexts()
# all_contexts will always have at least one context. If kubeconfig
# does not have any contexts defined, it will raise ConfigException.
return [context['name'] for context in all_contexts]
context_names = [context['name'] for context in all_contexts]
except k8s.config.config_exception.ConfigException:
# If running in cluster, return [None] to indicate in-cluster config
if is_incluster_config_available():
return [None]
return []
# If no config found, continue
pass
if is_incluster_config_available():
context_names.append(kubernetes.in_cluster_context_name())
return context_names


@functools.lru_cache()
Expand Down Expand Up @@ -2185,9 +2191,9 @@ def set_autodown_annotations(handle: 'backends.CloudVmRayResourceHandle',
def get_context_from_config(provider_config: Dict[str, Any]) -> Optional[str]:
context = provider_config.get('context',
get_current_kube_config_context_name())
if context == IN_CLUSTER_REGION:
# If the context (also used as the region) is set to IN_CLUSTER_REGION
# we need to use in-cluster auth.
if context == kubernetes.in_cluster_context_name():
# If the context (also used as the region) is in-cluster, we need to
# we need to use in-cluster auth by setting the context to None.
context = None
return context

Expand Down
5 changes: 5 additions & 0 deletions sky/templates/kubernetes-port-forward-proxy-command.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ KUBECTL_ARGS=()
if [ -n "$KUBE_CONTEXT" ]; then
KUBECTL_ARGS+=("--context=$KUBE_CONTEXT")
fi
# If context is not provided, it means we are using incluster auth. In this case,
# we need to set KUBECONFIG to /dev/null to avoid using kubeconfig file.
if [ -z "$KUBE_CONTEXT" ]; then
KUBECTL_ARGS+=("--kubeconfig=/dev/null")
fi
if [ -n "$KUBE_NAMESPACE" ]; then
KUBECTL_ARGS+=("--namespace=$KUBE_NAMESPACE")
fi
Expand Down
4 changes: 4 additions & 0 deletions sky/templates/kubernetes-ray.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ available_node_types:
valueFrom:
fieldRef:
fieldPath: metadata.labels['ray-node-type']
{% for key, value in k8s_env_vars.items() if k8s_env_vars is not none %}
- name: {{ key }}
value: {{ value }}
{% endfor %}
# Do not change this command - it keeps the pod alive until it is
# explicitly killed.
command: ["/bin/bash", "-c", "--"]
Expand Down
4 changes: 4 additions & 0 deletions sky/utils/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,10 @@ def run(
]
if self.context:
kubectl_args += ['--context', self.context]
# If context is none, it means we are using incluster auth. In this
# case, need to set KUBECONFIG to /dev/null to avoid using kubeconfig.
if self.context is None:
kubectl_args += ['--kubeconfig', '/dev/null']
kubectl_args += [self.pod_name]
if ssh_mode == SshMode.LOGIN:
assert isinstance(cmd, list), 'cmd must be a list for login mode.'
Expand Down
Loading

0 comments on commit 76e20b6

Please sign in to comment.