Skip to content
This repository has been archived by the owner on Jan 14, 2024. It is now read-only.

Commit

Permalink
refactor: Extract code to kubernetes.py, use KubernetesError exce…
Browse files Browse the repository at this point in the history
…ption instead of `Exception`
  • Loading branch information
blackandred committed Jan 30, 2022
1 parent f7976e3 commit c94437e
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 136 deletions.
27 changes: 16 additions & 11 deletions bahub/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ def from_container_not_running(container_id: str, status: str) -> 'DockerContain
return DockerContainerError('Container "{}" is not running but actually {}'.format(container_id, status))


class KubernetesError(TransportException):
@classmethod
def from_timed_out_waiting_for_pod(cls, pod_name: str, namespace: str) -> 'KubernetesError':
return cls(f"Timed out while waiting for pod '{pod_name}' in namespace '{namespace}'")

@classmethod
def cannot_scale_resource(cls, name: str, namespace: str, replicas: int):
return cls(f"Cannot achieve desired state of '{replicas}' replicas for '{name}' in '{namespace}' namespace")

@classmethod
def from_pod_creation_conflict(cls, pod_name: str):
return cls(f"POD '{pod_name}' already exists or is terminating, "
f"please wait a moment - cannot start process in parallel, "
f"it may break something")


class ConfigurationFactoryException(ApplicationException):
pass

Expand Down Expand Up @@ -79,17 +95,6 @@ def from_early_buffer_exit(stream_description: str) -> 'BufferingError':
return BufferingError('Buffering of stream "{}" ended earlier with error'.format(stream_description))


class CryptographyConfigurationError(ApplicationException):
pass


class CryptographyKeysAlreadyCreated(CryptographyConfigurationError):
@staticmethod
def from_keys_already_created(user_id: str) -> 'CryptographyConfigurationError':
return CryptographyKeysAlreadyCreated('Cryptography keys for "{uid}" already created, skipping creation'
.format(uid=user_id))


class BackupProcessError(ApplicationException):
"""Errors related to backup/restore streaming"""

Expand Down
112 changes: 110 additions & 2 deletions bahub/transports/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
"""
Generic Kubernetes methods for building Kubernetes transports
"""


import json
import os
import time
from typing import List, Callable
import yaml
from tempfile import TemporaryDirectory

from kubernetes.client import CoreV1Api, V1PodList, V1Pod, V1ObjectMeta, V1Scale, V1ScaleSpec, AppsV1Api, ApiException
from kubernetes.stream.ws_client import WSClient, ERROR_CHANNEL
from rkd.api.inputoutput import IO
from kubernetes import client
from kubernetes.stream import stream

from ..exception import KubernetesError
from ..fs import FilesystemInterface


Expand Down Expand Up @@ -93,6 +97,110 @@ def pod_exec(pod_name: str, namespace: str, cmd: List[str], io: IO) -> ExecResul
)


def find_pod_name(api: CoreV1Api, selector: str, namespace: str, io: IO) -> str:
"""
Returns a POD name
:raises: When no matching POD found
"""

pods: V1PodList = api.list_namespaced_pod(namespace, label_selector=selector, limit=1)

if len(pods.items) == 0:
raise Exception(f'No pods found matching selector {selector} in {namespace} namespace')

pod: V1Pod = pods.items[0]
pod_metadata: V1ObjectMeta = pod.metadata

io.debug(f"Found POD name: '{pod_metadata.name}' in namespace '{namespace}'")

return pod_metadata.name


def wait_for_pod_to_be_ready(api: CoreV1Api, pod_name: str, namespace: str, io: IO, timeout: int = 120):
"""
Waits for POD to reach a valid state
:raises: When timeout hits
"""

io.debug("Waiting for POD to be ready...")

for i in range(0, timeout):
pod: V1Pod = api.read_namespaced_pod(name=pod_name, namespace=namespace)

if pod.status.phase in ["Ready", "Healthy", "True", "Running"]:
_wait_for_pod_containers_to_be_ready(api, pod_name, namespace, timeout, io)
io.info(f"POD entered '{pod.status.phase}' state")
time.sleep(1)

return True

io.debug(f"Pod not ready. Status: {pod.status.phase}")
time.sleep(1)

raise KubernetesError.from_timed_out_waiting_for_pod(pod_name, namespace)


def _wait_for_pod_containers_to_be_ready(api: CoreV1Api, pod_name: str, namespace: str, timeout: int, io: IO):
"""
POD can be running, but containers could be still initializing - this method waits for containers
"""

for i in range(0, timeout):
pod: V1Pod = api.read_namespaced_pod(name=pod_name, namespace=namespace)

if all([(c.state.running and not c.state.waiting and not c.state.terminated)
for c in pod.status.container_statuses]):
io.info("All containers in a POD have started")
return


def scale_resource(api: AppsV1Api, name: str, namespace: str, replicas: int, io: IO):
"""
Scale down given Deployment/ReplicationController/StatefulSet
"""

io.info(f"Scaling deployment/{name} in {namespace} namespace to replicas '{replicas}'")
scale_spec = V1Scale(
metadata=V1ObjectMeta(name=name, namespace=namespace),
spec=V1ScaleSpec(
replicas=replicas
)
)
api.replace_namespaced_deployment_scale(name, namespace, scale_spec)

# then wait for it to be applied
for i in range(0, 3600):
deployment_as_dict = json.loads(api.read_namespaced_deployment(name=name, namespace=namespace,
_preload_content=False).data)

current_replicas_num = int(deployment_as_dict['spec']['replicas'])

if current_replicas_num == int(replicas):
io.info(f"POD's controller scaled to {replicas}")
return

io.debug(f"Waiting for cluster to scale POD's controller to {replicas}, currently: {current_replicas_num}")
time.sleep(1)

raise KubernetesError.cannot_scale_resource(name, namespace, replicas)


def create_pod(api: CoreV1Api, pod_name: str, namespace, specification: dict, io: IO):
io.info(f"Creating temporary POD '{pod_name}'")
specification['metadata']['name'] = pod_name

try:
api.create_namespaced_pod(namespace=namespace, body=specification)

except ApiException as e:
if e.reason == "Conflict" and "AlreadyExists" in str(e.body):
raise KubernetesError.from_pod_creation_conflict(pod_name) from e

raise e


class KubernetesPodFilesystem(FilesystemInterface):
io: IO
pod_name: str
Expand Down
74 changes: 12 additions & 62 deletions bahub/transports/kubernetes_podexec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@
Performs `exec` operation into EXISTING, RUNNING POD to run a backup operation in-place.
"""

import time
from typing import List
from kubernetes import config, client
from kubernetes.client import V1PodList, V1Pod, V1ObjectMeta
from rkd.api.inputoutput import IO

from bahub.bin import RequiredBinary, copy_required_tools_from_controller_cache_to_target_env, \
copy_encryption_keys_from_controller_to_target_env
from bahub.exception import ConfigurationError
from bahub.settings import BIN_VERSION_CACHE_PATH, TARGET_ENV_BIN_PATH, TARGET_ENV_VERSIONS_PATH
from bahub.transports.base import TransportInterface, create_backup_maker_command
from bahub.transports.kubernetes import KubernetesPodFilesystem, pod_exec, ExecResult
from bahub.transports.kubernetes import KubernetesPodFilesystem, pod_exec, ExecResult, find_pod_name, \
wait_for_pod_to_be_ready
from bahub.transports.sh import LocalFilesystem


Expand All @@ -34,6 +33,7 @@ def __init__(self, spec: dict, io: IO):
super().__init__(spec, io)
self._namespace = spec.get('namespace', 'default')
self._selector = spec.get('selector', '')
self._timeout = int(spec.get('timeout', 120))

if not self._selector:
raise ConfigurationError("'selector' for Kubernetes type transport cannot be empty")
Expand All @@ -56,6 +56,11 @@ def get_specification_schema() -> dict:
"type": "string",
"example": "prod",
"default": "default"
},
"timeout": {
"type": "string",
"example": 120,
"default": 120
}
}
}
Expand Down Expand Up @@ -84,7 +89,7 @@ def schedule(self, command: str, definition, is_backup: bool, version: str = "")
Runs a `kubectl exec` on already existing POD
"""

pod_name = self.find_pod_name(self._selector, self._namespace)
pod_name = self._find_pod_name(self._selector, self._namespace)
self._execute_in_pod_when_pod_will_be_ready(pod_name, command, definition, is_backup, version)

def __exit__(self, exc_type, exc_val, exc_t) -> None:
Expand Down Expand Up @@ -113,7 +118,7 @@ def _execute_in_pod_when_pod_will_be_ready(self, pod_name: str, command: str, de
:return:
"""

self.wait_for_pod_to_be_ready(pod_name, self._namespace)
wait_for_pod_to_be_ready(self._v1_core_api, pod_name, self._namespace, io=self.io(), timeout=self._timeout)
self._prepare_environment_inside_pod(definition, pod_name)

complete_cmd = create_backup_maker_command(command, definition, is_backup, version,
Expand Down Expand Up @@ -162,63 +167,8 @@ def watch(self) -> bool:
self._process.watch(self.io().debug)
return self._process.has_exited_with_success()

def wait_for_pod_to_be_ready(self, pod_name: str, namespace: str, timeout: int = 120):
"""
Waits for POD to reach a valid state
:raises: When timeout hits
"""

self.io().debug("Waiting for POD to be ready...")

for i in range(0, timeout):
pod: V1Pod = self._v1_core_api.read_namespaced_pod(name=pod_name, namespace=namespace)

if pod.status.phase in ["Ready", "Healthy", "True", "Running"]:
self._wait_for_pod_containers_to_be_ready(pod_name, namespace, timeout)
self.io().info(f"POD entered '{pod.status.phase}' state")
time.sleep(1)

return True

self.io().debug(f"Pod not ready. Status: {pod.status.phase}")
time.sleep(1)

raise Exception(f"Timed out while waiting for pod '{pod_name}' in namespace '{namespace}'")

def find_pod_name(self, selector: str, namespace: str):
"""
Returns a POD name
:raises: When no matching POD found
"""

pods: V1PodList = self.v1_core_api.list_namespaced_pod(namespace,
label_selector=selector,
limit=1)

if len(pods.items) == 0:
raise Exception(f'No pods found matching selector {selector} in {namespace} namespace')

pod: V1Pod = pods.items[0]
pod_metadata: V1ObjectMeta = pod.metadata

self.io().debug(f"Found POD name: '{pod_metadata.name}' in namespace '{namespace}'")

return pod_metadata.name

def get_required_binaries(self):
return []

def _wait_for_pod_containers_to_be_ready(self, pod_name: str, namespace: str, timeout: int):
"""
POD can be running, but containers could be still initializing
"""

for i in range(0, timeout):
pod: V1Pod = self._v1_core_api.read_namespaced_pod(name=pod_name, namespace=namespace)

if all([(c.state.running and not c.state.waiting and not c.state.terminated)
for c in pod.status.container_statuses]):
self.io().info("All containers in a POD have started")
return
def _find_pod_name(self, selector: str, namespace: str) -> str:
return find_pod_name(self.v1_core_api, selector, namespace, self.io())
Loading

0 comments on commit c94437e

Please sign in to comment.