diff --git a/lightkube/codecs.py b/lightkube/codecs.py index a5861f4..1189e35 100644 --- a/lightkube/codecs.py +++ b/lightkube/codecs.py @@ -2,23 +2,22 @@ import yaml -from .generic_resource import GenericGlobalResource, GenericNamespacedResource, create_resources_from_crd +from .generic_resource import ( + GenericGlobalResource, + GenericNamespacedResource, + create_resources_from_crd, +) from .core.exceptions import LoadResourceError from .core.resource_registry import resource_registry -__all__ = [ - 'from_dict', - 'load_all_yaml', - 'dump_all_yaml', - 'resource_registry' -] +__all__ = ["from_dict", "load_all_yaml", "dump_all_yaml", "resource_registry"] try: import jinja2 except ImportError: jinja2 = None -REQUIRED_ATTR = ('apiVersion', 'kind') +REQUIRED_ATTR = ("apiVersion", "kind") AnyResource = Union[GenericGlobalResource, GenericNamespacedResource] @@ -35,16 +34,23 @@ def from_dict(d: dict) -> AnyResource: always required. """ if not isinstance(d, Mapping): - raise LoadResourceError(f"Invalid resource definition, not a dict.") + raise LoadResourceError("Invalid resource definition, not a dict.") for attr in REQUIRED_ATTR: if attr not in d: - raise LoadResourceError(f"Invalid resource definition, key '{attr}' missing.") + raise LoadResourceError( + f"Invalid resource definition, key '{attr}' missing." + ) - model = resource_registry.load(d['apiVersion'], d['kind']) + model = resource_registry.load(d["apiVersion"], d["kind"]) return model.from_dict(d) -def load_all_yaml(stream: Union[str, TextIO], context: dict = None, template_env = None, create_resources_for_crds: bool = False) -> List[AnyResource]: +def load_all_yaml( + stream: Union[str, TextIO], + context: dict = None, + template_env=None, + create_resources_for_crds: bool = False, +) -> List[AnyResource]: """Load kubernetes resource objects defined as YAML. See `from_dict` regarding how resource types are detected. Returns a list of resource objects or raise a `LoadResourceError`. Skips any empty YAML documents in the stream, returning an empty list if all YAML documents are empty. Deep parse any items from .*List resources. @@ -80,7 +86,10 @@ def _flatten(objects: Iterator) -> List[AnyResource]: res = from_dict(obj) resources.append(res) - if create_resources_for_crds is True and res.kind == "CustomResourceDefinition": + if ( + create_resources_for_crds is True + and res.kind == "CustomResourceDefinition" + ): create_resources_from_crd(res) return resources @@ -101,7 +110,9 @@ def dump_all_yaml(resources: List[AnyResource], stream: TextIO = None, indent=2) return yaml.safe_dump_all(res, stream, indent=indent) -def _template(stream: Union[str, TextIO], context: dict = None, template_env = None) -> List[AnyResource]: +def _template( + stream: Union[str, TextIO], context: dict = None, template_env=None +) -> List[AnyResource]: """ Template a stream using jinja2 and the given context """ @@ -113,5 +124,7 @@ def _template(stream: Union[str, TextIO], context: dict = None, template_env = N elif not isinstance(template_env, jinja2.Environment): raise LoadResourceError("template_env is not a valid jinja2 template") - tmpl = template_env.from_string(stream if isinstance(stream, str) else stream.read()) + tmpl = template_env.from_string( + stream if isinstance(stream, str) else stream.read() + ) return tmpl.render(**context) diff --git a/lightkube/config/client_adapter.py b/lightkube/config/client_adapter.py index 3a556a4..34ecc54 100644 --- a/lightkube/config/client_adapter.py +++ b/lightkube/config/client_adapter.py @@ -11,11 +11,15 @@ from ..core.exceptions import ConfigError -def Client(config: SingleConfig, timeout: httpx.Timeout, trust_env: bool = True) -> httpx.Client: +def Client( + config: SingleConfig, timeout: httpx.Timeout, trust_env: bool = True +) -> httpx.Client: return httpx.Client(**httpx_parameters(config, timeout, trust_env)) -def AsyncClient(config: SingleConfig, timeout: httpx.Timeout, trust_env: bool = True) -> httpx.AsyncClient: +def AsyncClient( + config: SingleConfig, timeout: httpx.Timeout, trust_env: bool = True +) -> httpx.AsyncClient: return httpx.AsyncClient(**httpx_parameters(config, timeout, trust_env)) @@ -41,18 +45,26 @@ def auth_flow(self, request: httpx.Request): async def async_check_output(command, env): PIPE = asyncio.subprocess.PIPE - proc = await asyncio.create_subprocess_exec(*command, env=env, stdin=None, stdout=PIPE, stderr=PIPE) + proc = await asyncio.create_subprocess_exec( + *command, env=env, stdin=None, stdout=PIPE, stderr=PIPE + ) stdout, stderr = await proc.communicate() if proc.returncode != 0: - raise ConfigError(f"Exec {command[0]} returned {proc.returncode}: {stderr.decode()}") + raise ConfigError( + f"Exec {command[0]} returned {proc.returncode}: {stderr.decode()}" + ) return stdout def sync_check_output(command, env): - proc = subprocess.Popen(command, env=env, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = subprocess.Popen( + command, env=env, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) stdout, stderr = proc.communicate() if proc.returncode != 0: - raise ConfigError(f"Exec {command[0]} returned {proc.returncode}: {stderr.decode()}") + raise ConfigError( + f"Exec {command[0]} returned {proc.returncode}: {stderr.decode()}" + ) return stdout @@ -63,8 +75,13 @@ def __init__(self, exec: UserExec): def _prepare(self): exec = self._exec - if exec.apiVersion not in ("client.authentication.k8s.io/v1alpha1", "client.authentication.k8s.io/v1beta1"): - raise ConfigError(f"auth exec api version {exec.apiVersion} not implemented") + if exec.apiVersion not in ( + "client.authentication.k8s.io/v1alpha1", + "client.authentication.k8s.io/v1beta1", + ): + raise ConfigError( + f"auth exec api version {exec.apiVersion} not implemented" + ) cmd_env_vars = dict(os.environ) cmd_env_vars.update((var.name, var.value) for var in exec.env) # TODO: add support for passing KUBERNETES_EXEC_INFO env var @@ -121,7 +138,7 @@ def user_cert(user: User, abs_file): if user.client_cert or user.client_cert_data: return ( FileStr(user.client_cert_data) or abs_file(user.client_cert), - FileStr(user.client_key_data) or abs_file(user.client_key) + FileStr(user.client_key_data) or abs_file(user.client_key), ) return None diff --git a/lightkube/config/kubeconfig.py b/lightkube/config/kubeconfig.py index f123bb5..157a32e 100644 --- a/lightkube/config/kubeconfig.py +++ b/lightkube/config/kubeconfig.py @@ -1,7 +1,5 @@ import os import yaml -import tempfile -import base64 from typing import Dict, NamedTuple, Optional from pathlib import Path @@ -21,7 +19,7 @@ def to_mapping(obj_list, key, factory): - return {obj['name']: factory.from_dict(obj[key], lazy=False) for obj in obj_list} + return {obj["name"]: factory.from_dict(obj[key], lazy=False) for obj in obj_list} DEFAULT_NAMESPACE = "default" @@ -45,24 +43,30 @@ def abs_file(self, fname): return fname if self.fname is None: - raise exceptions.ConfigError(f"{fname} is relative, but kubeconfig path unknown") + raise exceptions.ConfigError( + f"{fname} is relative, but kubeconfig path unknown" + ) return self.fname.parent.joinpath(fname) PROXY_CONF = SingleConfig( - context_name="default", context=Context(cluster="default"), - cluster=Cluster(server="http://localhost:8080") + context_name="default", + context=Context(cluster="default"), + cluster=Cluster(server="http://localhost:8080"), ) class KubeConfig: """Class to represent a kubeconfig. See the specific constructors depending on your use case.""" + clusters: Dict[str, Cluster] users: Dict[str, User] contexts: Dict[str, Context] - def __init__(self, *, clusters, contexts, users=None, current_context=None, fname=None): + def __init__( + self, *, clusters, contexts, users=None, current_context=None, fname=None + ): """ Create the kubernetes configuration manually. Normally this constructor should not be called directly. Use a specific constructor instead. @@ -92,14 +96,16 @@ def from_dict(cls, conf: Dict, fname=None): present inside the configuration. """ return cls( - current_context=conf.get('current-context'), - clusters=to_mapping(conf['clusters'], 'cluster', factory=Cluster), - contexts=to_mapping(conf['contexts'], 'context', factory=Context), - users=to_mapping(conf.get('users', []), 'user', factory=User), - fname=fname + current_context=conf.get("current-context"), + clusters=to_mapping(conf["clusters"], "cluster", factory=Cluster), + contexts=to_mapping(conf["contexts"], "context", factory=Context), + users=to_mapping(conf.get("users", []), "user", factory=User), + fname=fname, ) - def get(self, context_name=None, default: SingleConfig = None) -> Optional[SingleConfig]: + def get( + self, context_name=None, default: SingleConfig = None + ) -> Optional[SingleConfig]: """Returns a `SingleConfig` instance, representing the configuration matching the given `context_name`. Lightkube client will automatically call this method without parameters when an instance of `KubeConfig` is provided. @@ -116,17 +122,20 @@ def get(self, context_name=None, default: SingleConfig = None) -> Optional[Singl context_name = self.current_context if context_name is None: if default is None: - raise exceptions.ConfigError("No current context set and no default provided") + raise exceptions.ConfigError( + "No current context set and no default provided" + ) return default try: ctx = self.contexts[context_name] except KeyError: raise exceptions.ConfigError(f"Context '{context_name}' not found") return SingleConfig( - context_name=context_name, context=ctx, + context_name=context_name, + context=ctx, cluster=self.clusters[ctx.cluster], user=self.users[ctx.user] if ctx.user else None, - fname=self.fname + fname=self.fname, ) @classmethod @@ -144,15 +153,21 @@ def from_file(cls, fname): return cls.from_dict(yaml.safe_load(f.read()), fname=filepath) @classmethod - def from_one(cls, *, cluster, user=None, context_name='default', namespace=None, fname=None): + def from_one( + cls, *, cluster, user=None, context_name="default", namespace=None, fname=None + ): """Creates an instance of the KubeConfig class from one cluster and one user configuration""" - context = Context(cluster=context_name, user=context_name if user else None, namespace=namespace) + context = Context( + cluster=context_name, + user=context_name if user else None, + namespace=namespace, + ) return cls( clusters={context_name: cluster}, contexts={context_name: context}, users={context_name: user} if user else None, current_context=context_name, - fname=fname + fname=fname, ) @classmethod @@ -179,35 +194,36 @@ def from_service_account(cls, service_account=SERVICE_ACCOUNT): host = os.environ["KUBERNETES_SERVICE_HOST"] port = os.environ["KUBERNETES_SERVICE_PORT"] - if ":" in host: # ipv6 + if ":" in host: # ipv6 host = f"[{host}]" return cls.from_one( cluster=Cluster( server=f"https://{host}:{port}", - certificate_auth=str(account_dir.joinpath("ca.crt")) + certificate_auth=str(account_dir.joinpath("ca.crt")), ), user=User(token=token), - namespace=namespace + namespace=namespace, ) @classmethod - def from_env(cls, service_account=SERVICE_ACCOUNT, default_config=DEFAULT_KUBECONFIG): + def from_env( + cls, service_account=SERVICE_ACCOUNT, default_config=DEFAULT_KUBECONFIG + ): """Attempts to load the configuration automatically looking at the environment and filesystem. - The method will attempt to load a configuration using the following order: + The method will attempt to load a configuration using the following order: - * in-cluster config. - * config file defined in `KUBECONFIG` environment variable. - * configuration file present on the default location. + * in-cluster config. + * config file defined in `KUBECONFIG` environment variable. + * configuration file present on the default location. - **Parameters** + **Parameters** - * **service_account**: Allows to override the default service account directory path. - Default `/var/run/secrets/kubernetes.io/serviceaccount`. - * **default_config**: Allows to override the default configuration location. Default `~/.kube/config`. - """ + * **service_account**: Allows to override the default service account directory path. + Default `/var/run/secrets/kubernetes.io/serviceaccount`. + * **default_config**: Allows to override the default configuration location. Default `~/.kube/config`. + """ try: return KubeConfig.from_service_account(service_account=service_account) except exceptions.ConfigError: - return KubeConfig.from_file(os.environ.get('KUBECONFIG', default_config)) - + return KubeConfig.from_file(os.environ.get("KUBECONFIG", default_config)) diff --git a/lightkube/config/models.py b/lightkube/config/models.py index 4874cb5..f3ec972 100644 --- a/lightkube/config/models.py +++ b/lightkube/config/models.py @@ -52,16 +52,22 @@ class User(DataclassDictMixIn): username: str = None password: str = None token: str = None - auth_provider: Dict = field(metadata={'json': 'auth-provider'}, default=None) - client_cert: str = field(metadata={'json': 'client-certificate'}, default=None) - client_cert_data: str = field(metadata={'json': 'client-certificate-data'}, default=None) - client_key: str = field(metadata={'json': 'client-key'}, default=None) - client_key_data: str = field(metadata={'json': 'client-key-data'}, default=None) + auth_provider: Dict = field(metadata={"json": "auth-provider"}, default=None) + client_cert: str = field(metadata={"json": "client-certificate"}, default=None) + client_cert_data: str = field( + metadata={"json": "client-certificate-data"}, default=None + ) + client_key: str = field(metadata={"json": "client-key"}, default=None) + client_key_data: str = field(metadata={"json": "client-key-data"}, default=None) @dataclass class Cluster(DataclassDictMixIn): server: str = "http://localhost:8080" - certificate_auth: str = field(metadata={'json': 'certificate-authority'}, default=None) - certificate_auth_data: str = field(metadata={'json': 'certificate-authority-data'}, default=None) - insecure: bool = field(metadata={'json': 'insecure-skip-tls-verify'}, default=False) + certificate_auth: str = field( + metadata={"json": "certificate-authority"}, default=None + ) + certificate_auth_data: str = field( + metadata={"json": "certificate-authority-data"}, default=None + ) + insecure: bool = field(metadata={"json": "insecure-skip-tls-verify"}, default=False) diff --git a/lightkube/core/async_client.py b/lightkube/core/async_client.py index 3adaa40..db13cf4 100644 --- a/lightkube/core/async_client.py +++ b/lightkube/core/async_client.py @@ -1,16 +1,31 @@ -from typing import Type, Iterator, TypeVar, Union, overload, Dict, Tuple, List, Iterable, AsyncIterable +from typing import ( + Type, + Union, + overload, + Dict, + Tuple, + List, + Iterable, + AsyncIterable, +) import httpx from ..config.kubeconfig import SingleConfig, KubeConfig -from .. import operators from ..core import resource as r -from .generic_client import GenericSyncClient, GenericAsyncClient +from .generic_client import GenericAsyncClient from ..core.exceptions import ConditionError, ObjectDeleted from ..types import OnErrorHandler, PatchType, CascadeType, on_error_raise from .internal_resources import core_v1 from .selector import build_selector -from .client import NamespacedResource, GlobalResource, GlobalSubResource, NamespacedSubResource, \ - AllNamespacedResource, Resource, LabelValue, FieldValue, LabelSelector, FieldSelector +from .client import ( + NamespacedResource, + GlobalResource, + GlobalSubResource, + NamespacedSubResource, + AllNamespacedResource, + LabelSelector, + FieldSelector, +) class AsyncClient: @@ -188,12 +203,12 @@ async def deletecollection( ) @overload - async def get(self, res: Type[GlobalResource], name: str) -> GlobalResource: - ... + async def get(self, res: Type[GlobalResource], name: str) -> GlobalResource: ... @overload - async def get(self, res: Type[AllNamespacedResource], name: str, *, namespace: str = None) -> AllNamespacedResource: - ... + async def get( + self, res: Type[AllNamespacedResource], name: str, *, namespace: str = None + ) -> AllNamespacedResource: ... async def get(self, res, name, *, namespace=None): """Return an object @@ -204,18 +219,30 @@ async def get(self, res, name, *, namespace=None): * **name** - Name of the object to fetch. * **namespace** - *(optional)* Name of the namespace containing the object (Only for namespaced resources). """ - return await self._client.request("get", res=res, name=name, namespace=namespace) + return await self._client.request( + "get", res=res, name=name, namespace=namespace + ) @overload - def list(self, res: Type[GlobalResource], *, chunk_size: int = None, labels: LabelSelector = None, fields: FieldSelector = None) -> \ - AsyncIterable[GlobalResource]: - ... + def list( + self, + res: Type[GlobalResource], + *, + chunk_size: int = None, + labels: LabelSelector = None, + fields: FieldSelector = None, + ) -> AsyncIterable[GlobalResource]: ... @overload - def list(self, res: Type[NamespacedResource], *, namespace: str = None, chunk_size: int = None, - labels: LabelSelector = None, fields: FieldSelector = None) -> \ - AsyncIterable[NamespacedResource]: - ... + def list( + self, + res: Type[NamespacedResource], + *, + namespace: str = None, + chunk_size: int = None, + labels: LabelSelector = None, + fields: FieldSelector = None, + ) -> AsyncIterable[NamespacedResource]: ... def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None): """Return an iterator of objects matching the selection criteria. @@ -231,31 +258,55 @@ def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None """ br = self._client.prepare_request( - 'list', res=res, namespace=namespace, + "list", + res=res, + namespace=namespace, params={ - 'limit': chunk_size, - 'labelSelector': build_selector(labels) if labels else None, - 'fieldSelector': build_selector(fields, for_fields=True) if fields else None - } + "limit": chunk_size, + "labelSelector": build_selector(labels) if labels else None, + "fieldSelector": ( + build_selector(fields, for_fields=True) if fields else None + ), + }, ) return self._client.list(br) @overload - def watch(self, res: Type[GlobalResource], *, labels: LabelSelector = None, fields: FieldSelector = None, - server_timeout: int = None, - resource_version: str = None, on_error: OnErrorHandler = on_error_raise) -> \ - AsyncIterable[Tuple[str, GlobalResource]]: - ... + def watch( + self, + res: Type[GlobalResource], + *, + labels: LabelSelector = None, + fields: FieldSelector = None, + server_timeout: int = None, + resource_version: str = None, + on_error: OnErrorHandler = on_error_raise, + ) -> AsyncIterable[Tuple[str, GlobalResource]]: ... @overload - def watch(self, res: Type[NamespacedResource], *, namespace: str = None, - labels: LabelSelector = None, fields: FieldSelector = None, - server_timeout: int = None, resource_version: str = None, - on_error: OnErrorHandler = on_error_raise) -> \ - AsyncIterable[Tuple[str, NamespacedResource]]: - ... - - def watch(self, res, *, namespace=None, labels=None, fields=None, server_timeout=None, resource_version=None, on_error=on_error_raise): + def watch( + self, + res: Type[NamespacedResource], + *, + namespace: str = None, + labels: LabelSelector = None, + fields: FieldSelector = None, + server_timeout: int = None, + resource_version: str = None, + on_error: OnErrorHandler = on_error_raise, + ) -> AsyncIterable[Tuple[str, NamespacedResource]]: ... + + def watch( + self, + res, + *, + namespace=None, + labels=None, + fields=None, + server_timeout=None, + resource_version=None, + on_error=on_error_raise, + ): """Watch changes to objects **parameters** @@ -271,13 +322,19 @@ def watch(self, res, *, namespace=None, labels=None, fields=None, server_timeout * **on_error** - *(optional)* Function that control what to do in case of errors. The default implementation will raise any error. """ - br = self._client.prepare_request("list", res=res, namespace=namespace, watch=True, + br = self._client.prepare_request( + "list", + res=res, + namespace=namespace, + watch=True, params={ - 'timeoutSeconds': server_timeout, - 'resourceVersion': resource_version, - 'labelSelector': build_selector(labels) if labels else None, - 'fieldSelector': build_selector(fields, for_fields=True) if fields else None - } + "timeoutSeconds": server_timeout, + "resourceVersion": resource_version, + "labelSelector": build_selector(labels) if labels else None, + "fieldSelector": ( + build_selector(fields, for_fields=True) if fields else None + ), + }, ) return self._client.watch(br, on_error=on_error) @@ -289,8 +346,7 @@ async def wait( *, for_conditions: Iterable[str], raise_for_conditions: Iterable[str] = (), - ) -> GlobalResource: - ... + ) -> GlobalResource: ... @overload async def wait( @@ -301,8 +357,7 @@ async def wait( for_conditions: Iterable[str], namespace: str = None, raise_for_conditions: Iterable[str] = (), - ) -> AllNamespacedResource: - ... + ) -> AllNamespacedResource: ... async def wait( self, @@ -325,12 +380,12 @@ async def wait( """ kind = r.api_info(res).plural - full_name = f'{kind}/{name}' + full_name = f"{kind}/{name}" for_conditions = list(for_conditions) raise_for_conditions = list(raise_for_conditions) - watch = self.watch(res, namespace=namespace, fields={'metadata.name': name}) + watch = self.watch(res, namespace=namespace, fields={"metadata.name": name}) try: async for op, obj in watch: @@ -345,14 +400,18 @@ async def wait( except AttributeError: status = obj.status - conditions = [c for c in status.get('conditions', []) if c['status'] == 'True'] - if any(c['type'] in for_conditions for c in conditions): + conditions = [ + c for c in status.get("conditions", []) if c["status"] == "True" + ] + if any(c["type"] in for_conditions for c in conditions): return obj - failures = [c for c in conditions if c['type'] in raise_for_conditions] + failures = [c for c in conditions if c["type"] in raise_for_conditions] if failures: - raise ConditionError(full_name, [f.get('message', f['type']) for f in failures]) + raise ConditionError( + full_name, [f.get("message", f["type"]) for f in failures] + ) finally: # we ensure the async generator is closed before returning await watch.aclose() @@ -569,12 +628,31 @@ async def replace( ) @overload - def log(self, name:str, *, namespace: str = None, container: str = None, follow: bool = False, - since: int = None, tail_lines: int = None, timestamps: bool = False, newlines: bool = True) -> AsyncIterable[str]: - ... - - def log(self, name, *, namespace=None, container=None, follow=False, - since=None, tail_lines=None, timestamps=False, newlines=True): + def log( + self, + name: str, + *, + namespace: str = None, + container: str = None, + follow: bool = False, + since: int = None, + tail_lines: int = None, + timestamps: bool = False, + newlines: bool = True, + ) -> AsyncIterable[str]: ... + + def log( + self, + name, + *, + namespace=None, + container=None, + follow=False, + since=None, + tail_lines=None, + timestamps=False, + newlines=True, + ): """Return log lines for the given Pod **parameters** @@ -589,16 +667,26 @@ def log(self, name, *, namespace=None, container=None, follow=False, * **newlines** - *(optional)* If `True`, each line will end with a newline, otherwise the newlines will be stripped. """ br = self._client.prepare_request( - 'get', core_v1.PodLog, name=name, namespace=namespace, - params={'timestamps': timestamps, 'tailLines': tail_lines, 'container': container, - 'sinceSeconds': since, 'follow': follow}) + "get", + core_v1.PodLog, + name=name, + namespace=namespace, + params={ + "timestamps": timestamps, + "tailLines": tail_lines, + "container": container, + "sinceSeconds": since, + "follow": follow, + }, + ) req = self._client.build_adapter_request(br) async def stream_log(): resp = await self._client.send(req, stream=follow) self._client.raise_for_status(resp) async for line in resp.aiter_lines(): - yield line + '\n' if newlines else line + yield line + "\n" if newlines else line + return stream_log() @overload @@ -667,14 +755,25 @@ async def apply( be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server` to `kubectl` commands. """ - if namespace is None and isinstance(obj, r.NamespacedResource) and obj.metadata.namespace: + if ( + namespace is None + and isinstance(obj, r.NamespacedResource) + and obj.metadata.namespace + ): namespace = obj.metadata.namespace if name is None and obj.metadata.name: name = obj.metadata.name - return await self.patch(type(obj), name, obj, namespace=namespace, - patch_type=PatchType.APPLY, field_manager=field_manager, force=force, dry_run=dry_run) + return await self.patch( + type(obj), + name, + obj, + namespace=namespace, + patch_type=PatchType.APPLY, + field_manager=field_manager, + force=force, + dry_run=dry_run, + ) async def close(self): """Close the underline httpx client""" await self._client.close() - diff --git a/lightkube/core/client.py b/lightkube/core/client.py index fa61c07..f845772 100644 --- a/lightkube/core/client.py +++ b/lightkube/core/client.py @@ -1,20 +1,32 @@ -from typing import Type, Iterator, TypeVar, Union, overload, Dict, Tuple, List, Iterable, AsyncIterable +from typing import ( + Type, + Iterator, + TypeVar, + Union, + overload, + Dict, + Tuple, + List, + Iterable, +) import httpx from ..config.kubeconfig import SingleConfig, KubeConfig from .. import operators from ..core import resource as r -from .generic_client import GenericSyncClient, GenericAsyncClient +from .generic_client import GenericSyncClient from ..core.exceptions import ConditionError, ObjectDeleted from ..types import OnErrorHandler, PatchType, CascadeType, on_error_raise from .internal_resources import core_v1 from .selector import build_selector -NamespacedResource = TypeVar('NamespacedResource', bound=r.NamespacedResource) -GlobalResource = TypeVar('GlobalResource', bound=r.GlobalResource) -GlobalSubResource = TypeVar('GlobalSubResource', bound=r.GlobalSubResource) -NamespacedSubResource = TypeVar('NamespacedSubResource', bound=r.NamespacedSubResource) -AllNamespacedResource = TypeVar('AllNamespacedResource', bound=Union[r.NamespacedResource, r.NamespacedSubResource]) -Resource = TypeVar('Resource', bound=r.Resource) +NamespacedResource = TypeVar("NamespacedResource", bound=r.NamespacedResource) +GlobalResource = TypeVar("GlobalResource", bound=r.GlobalResource) +GlobalSubResource = TypeVar("GlobalSubResource", bound=r.GlobalSubResource) +NamespacedSubResource = TypeVar("NamespacedSubResource", bound=r.NamespacedSubResource) +AllNamespacedResource = TypeVar( + "AllNamespacedResource", bound=Union[r.NamespacedResource, r.NamespacedSubResource] +) +Resource = TypeVar("Resource", bound=r.Resource) LabelValue = Union[str, None, operators.Operator, Iterable] FieldValue = Union[str, operators.BinaryOperator, operators.SequenceOperator] LabelSelector = Dict[str, LabelValue] @@ -196,12 +208,12 @@ def deletecollection( ) @overload - def get(self, res: Type[GlobalResource], name: str) -> GlobalResource: - ... + def get(self, res: Type[GlobalResource], name: str) -> GlobalResource: ... @overload - def get(self, res: Type[AllNamespacedResource], name: str, *, namespace: str = None) -> AllNamespacedResource: - ... + def get( + self, res: Type[AllNamespacedResource], name: str, *, namespace: str = None + ) -> AllNamespacedResource: ... def get(self, res, name, *, namespace=None): """Return an object @@ -215,15 +227,25 @@ def get(self, res, name, *, namespace=None): return self._client.request("get", res=res, name=name, namespace=namespace) @overload - def list(self, res: Type[GlobalResource], *, chunk_size: int = None, labels: LabelSelector = None, fields: FieldSelector = None) -> \ - Iterator[GlobalResource]: - ... + def list( + self, + res: Type[GlobalResource], + *, + chunk_size: int = None, + labels: LabelSelector = None, + fields: FieldSelector = None, + ) -> Iterator[GlobalResource]: ... @overload - def list(self, res: Type[NamespacedResource], *, namespace: str = None, chunk_size: int = None, - labels: LabelSelector = None, fields: FieldSelector = None) -> \ - Iterator[NamespacedResource]: - ... + def list( + self, + res: Type[NamespacedResource], + *, + namespace: str = None, + chunk_size: int = None, + labels: LabelSelector = None, + fields: FieldSelector = None, + ) -> Iterator[NamespacedResource]: ... def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None): """Return an iterator of objects matching the selection criteria. @@ -239,31 +261,55 @@ def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None """ br = self._client.prepare_request( - 'list', res=res, namespace=namespace, + "list", + res=res, + namespace=namespace, params={ - 'limit': chunk_size, - 'labelSelector': build_selector(labels) if labels else None, - 'fieldSelector': build_selector(fields, for_fields=True) if fields else None - } + "limit": chunk_size, + "labelSelector": build_selector(labels) if labels else None, + "fieldSelector": ( + build_selector(fields, for_fields=True) if fields else None + ), + }, ) return self._client.list(br) @overload - def watch(self, res: Type[GlobalResource], *, labels: LabelSelector = None, fields: FieldSelector = None, - server_timeout: int = None, - resource_version: str = None, on_error: OnErrorHandler = on_error_raise) -> \ - Iterator[Tuple[str, GlobalResource]]: - ... + def watch( + self, + res: Type[GlobalResource], + *, + labels: LabelSelector = None, + fields: FieldSelector = None, + server_timeout: int = None, + resource_version: str = None, + on_error: OnErrorHandler = on_error_raise, + ) -> Iterator[Tuple[str, GlobalResource]]: ... @overload - def watch(self, res: Type[NamespacedResource], *, namespace: str = None, - labels: LabelSelector = None, fields: FieldSelector = None, - server_timeout: int = None, resource_version: str = None, - on_error: OnErrorHandler = on_error_raise) -> \ - Iterator[Tuple[str, NamespacedResource]]: - ... - - def watch(self, res, *, namespace=None, labels=None, fields=None, server_timeout=None, resource_version=None, on_error=on_error_raise): + def watch( + self, + res: Type[NamespacedResource], + *, + namespace: str = None, + labels: LabelSelector = None, + fields: FieldSelector = None, + server_timeout: int = None, + resource_version: str = None, + on_error: OnErrorHandler = on_error_raise, + ) -> Iterator[Tuple[str, NamespacedResource]]: ... + + def watch( + self, + res, + *, + namespace=None, + labels=None, + fields=None, + server_timeout=None, + resource_version=None, + on_error=on_error_raise, + ): """Watch changes to objects **parameters** @@ -279,13 +325,19 @@ def watch(self, res, *, namespace=None, labels=None, fields=None, server_timeout * **on_error** - *(optional)* Function that control what to do in case of errors. The default implementation will raise any error. """ - br = self._client.prepare_request("list", res=res, namespace=namespace, watch=True, + br = self._client.prepare_request( + "list", + res=res, + namespace=namespace, + watch=True, params={ - 'timeoutSeconds': server_timeout, - 'resourceVersion': resource_version, - 'labelSelector': build_selector(labels) if labels else None, - 'fieldSelector': build_selector(fields, for_fields=True) if fields else None - } + "timeoutSeconds": server_timeout, + "resourceVersion": resource_version, + "labelSelector": build_selector(labels) if labels else None, + "fieldSelector": ( + build_selector(fields, for_fields=True) if fields else None + ), + }, ) return self._client.watch(br, on_error=on_error) @@ -297,8 +349,7 @@ def wait( *, for_conditions: Iterable[str], raise_for_conditions: Iterable[str] = (), - ) -> GlobalResource: - ... + ) -> GlobalResource: ... @overload def wait( @@ -309,8 +360,7 @@ def wait( for_conditions: Iterable[str], namespace: str = None, raise_for_conditions: Iterable[str] = (), - ) -> AllNamespacedResource: - ... + ) -> AllNamespacedResource: ... def wait( self, @@ -333,12 +383,14 @@ def wait( """ kind = r.api_info(res).plural - full_name = f'{kind}/{name}' + full_name = f"{kind}/{name}" for_conditions = list(for_conditions) raise_for_conditions = list(raise_for_conditions) - for op, obj in self.watch(res, namespace=namespace, fields={'metadata.name': name}): + for op, obj in self.watch( + res, namespace=namespace, fields={"metadata.name": name} + ): if obj.status is None: continue @@ -350,14 +402,18 @@ def wait( except AttributeError: status = obj.status - conditions = [c for c in status.get('conditions', []) if c['status'] == 'True'] - if any(c['type'] in for_conditions for c in conditions): + conditions = [ + c for c in status.get("conditions", []) if c["status"] == "True" + ] + if any(c["type"] in for_conditions for c in conditions): return obj - failures = [c for c in conditions if c['type'] in raise_for_conditions] + failures = [c for c in conditions if c["type"] in raise_for_conditions] if failures: - raise ConditionError(full_name, [f.get('message', f['type']) for f in failures]) + raise ConditionError( + full_name, [f.get("message", f["type"]) for f in failures] + ) @overload def patch( @@ -502,23 +558,44 @@ def create( ) @overload - def replace(self, obj: GlobalSubResource, name: str, field_manager: str = None, dry_run: bool = False) -> GlobalSubResource: - ... + def replace( + self, + obj: GlobalSubResource, + name: str, + field_manager: str = None, + dry_run: bool = False, + ) -> GlobalSubResource: ... @overload - def replace(self, obj: NamespacedSubResource, name: str, *, namespace: str = None, field_manager: str = None, dry_run: bool = False) \ - -> NamespacedSubResource: - ... + def replace( + self, + obj: NamespacedSubResource, + name: str, + *, + namespace: str = None, + field_manager: str = None, + dry_run: bool = False, + ) -> NamespacedSubResource: ... @overload - def replace(self, obj: GlobalResource, field_manager: str = None, dry_run: bool = False) -> GlobalResource: - ... + def replace( + self, obj: GlobalResource, field_manager: str = None, dry_run: bool = False + ) -> GlobalResource: ... @overload - def replace(self, obj: NamespacedResource, field_manager: str = None, dry_run: bool = False) -> NamespacedResource: - ... + def replace( + self, obj: NamespacedResource, field_manager: str = None, dry_run: bool = False + ) -> NamespacedResource: ... - def replace(self, obj, name=None, *, namespace=None, field_manager=None, dry_run: bool = False): + def replace( + self, + obj, + name=None, + *, + namespace=None, + field_manager=None, + dry_run: bool = False, + ): """Replace an existing resource. **parameters** @@ -532,16 +609,43 @@ def replace(self, obj, name=None, *, namespace=None, field_manager=None, dry_run be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server` to `kubectl` commands. """ - return self._client.request("put", name=name, namespace=namespace, obj=obj, - params={'fieldManager': field_manager, "dryRun": "All" if dry_run else None}) + return self._client.request( + "put", + name=name, + namespace=namespace, + obj=obj, + params={ + "fieldManager": field_manager, + "dryRun": "All" if dry_run else None, + }, + ) @overload - def log(self, name:str, *, namespace: str = None, container: str = None, follow: bool = False, - since: int = None, tail_lines: int = None, timestamps: bool = False, newlines: bool = True) -> Iterator[str]: - ... - - def log(self, name, *, namespace=None, container=None, follow=False, - since=None, tail_lines=None, timestamps=False, newlines=True): + def log( + self, + name: str, + *, + namespace: str = None, + container: str = None, + follow: bool = False, + since: int = None, + tail_lines: int = None, + timestamps: bool = False, + newlines: bool = True, + ) -> Iterator[str]: ... + + def log( + self, + name, + *, + namespace=None, + container=None, + follow=False, + since=None, + tail_lines=None, + timestamps=False, + newlines=True, + ): """Return log lines for the given Pod **parameters** @@ -556,33 +660,74 @@ def log(self, name, *, namespace=None, container=None, follow=False, * **newlines** - *(optional)* If `True`, each line will end with a newline, otherwise the newlines will be stripped. """ br = self._client.prepare_request( - 'get', core_v1.PodLog, name=name, namespace=namespace, - params={'timestamps': timestamps, 'tailLines': tail_lines, 'container': container, - 'sinceSeconds': since, 'follow': follow}) + "get", + core_v1.PodLog, + name=name, + namespace=namespace, + params={ + "timestamps": timestamps, + "tailLines": tail_lines, + "container": container, + "sinceSeconds": since, + "follow": follow, + }, + ) req = self._client.build_adapter_request(br) resp = self._client.send(req, stream=follow) self._client.raise_for_status(resp) - return (l + '\n' if newlines else l for l in resp.iter_lines()) + return (l + "\n" if newlines else l for l in resp.iter_lines()) @overload - def apply(self, obj: GlobalSubResource, name: str, *, field_manager: str = None, force: bool = False, dry_run: bool = False) \ - -> GlobalSubResource: - ... + def apply( + self, + obj: GlobalSubResource, + name: str, + *, + field_manager: str = None, + force: bool = False, + dry_run: bool = False, + ) -> GlobalSubResource: ... @overload - def apply(self, obj: NamespacedSubResource, name: str, *, namespace: str = None, - field_manager: str = None, force: bool = False, dry_run: bool = False) -> NamespacedSubResource: - ... + def apply( + self, + obj: NamespacedSubResource, + name: str, + *, + namespace: str = None, + field_manager: str = None, + force: bool = False, + dry_run: bool = False, + ) -> NamespacedSubResource: ... @overload - def apply(self, obj: GlobalResource, field_manager: str = None, force: bool = False, dry_run: bool = False) -> GlobalResource: - ... + def apply( + self, + obj: GlobalResource, + field_manager: str = None, + force: bool = False, + dry_run: bool = False, + ) -> GlobalResource: ... @overload - def apply(self, obj: NamespacedResource, field_manager: str = None, force: bool = False, dry_run: bool = False) -> NamespacedResource: - ... + def apply( + self, + obj: NamespacedResource, + field_manager: str = None, + force: bool = False, + dry_run: bool = False, + ) -> NamespacedResource: ... - def apply(self, obj, name=None, *, namespace=None, field_manager=None, force=False, dry_run=False): + def apply( + self, + obj, + name=None, + *, + namespace=None, + field_manager=None, + force=False, + dry_run=False, + ): """Create or configure an object. This method uses the [server-side apply](https://kubernetes.io/docs/reference/using-api/server-side-apply/) functionality. @@ -598,10 +743,21 @@ def apply(self, obj, name=None, *, namespace=None, field_manager=None, force=Fal be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server` to `kubectl` commands. """ - if namespace is None and isinstance(obj, r.NamespacedResource) and obj.metadata.namespace: + if ( + namespace is None + and isinstance(obj, r.NamespacedResource) + and obj.metadata.namespace + ): namespace = obj.metadata.namespace if name is None and obj.metadata.name: name = obj.metadata.name - return self.patch(type(obj), name, obj, namespace=namespace, - patch_type=PatchType.APPLY, field_manager=field_manager, force=force, dry_run=dry_run) - + return self.patch( + type(obj), + name, + obj, + namespace=namespace, + patch_type=PatchType.APPLY, + field_manager=field_manager, + force=force, + dry_run=dry_run, + ) diff --git a/lightkube/core/dataclasses_dict.py b/lightkube/core/dataclasses_dict.py index f66e375..d37638a 100644 --- a/lightkube/core/dataclasses_dict.py +++ b/lightkube/core/dataclasses_dict.py @@ -1,4 +1,3 @@ -import sys import typing from typing import Union from datetime import datetime @@ -51,18 +50,22 @@ def nohop(x, kw): def is_dataclass_json(cls): return dc.is_dataclass(cls) and issubclass(cls, DataclassDictMixIn) + NoneType = type(None) + def _remove_optional(tp): - if get_origin(tp) is Union: - args = get_args(tp) - if args[1] is NoneType: - return args[0] - return tp + if get_origin(tp) is Union: + args = get_args(tp) + if args[1] is NoneType: + return args[0] + return tp + def get_type_hints(cl): - types = typing.get_type_hints(cl) - return {k: _remove_optional(v) for k, v in types.items()} + types = typing.get_type_hints(cl) + return {k: _remove_optional(v) for k, v in types.items()} + def extract_types(cls, is_to=True): func_name = "to_json_type" if is_to else "from_json_type" @@ -79,9 +82,15 @@ def extract_types(cls, is_to=True): is_list = False if is_dataclass_json(t): - yield k, Converter(is_list=is_list, supp_kw=True, func=getattr(t, method_name)), field.default + yield k, Converter( + is_list=is_list, supp_kw=True, func=getattr(t, method_name) + ), field.default elif t in TYPE_CONVERTERS: - yield k, Converter(is_list=is_list, supp_kw=False, func=getattr(TYPE_CONVERTERS[t], func_name)), field.default + yield k, Converter( + is_list=is_list, + supp_kw=False, + func=getattr(TYPE_CONVERTERS[t], func_name), + ), field.default else: if is_to: yield k, nohop, field.default @@ -118,7 +127,11 @@ def _setup(cls): cls._late_init_from = list(t[:2] for t in extract_types(cls, is_to=False)) for k, convert in cls._late_init_from: setattr(cls, k, LazyAttribute(k, convert)) - cls._prop_to_json = {field.name: field.metadata['json'] for field in dc.fields(cls) if 'json' in field.metadata} + cls._prop_to_json = { + field.name: field.metadata["json"] + for field in dc.fields(cls) + if "json" in field.metadata + } cls._json_to_prop = {v: k for k, v in cls._prop_to_json.items()} cls._late_init_to = list(extract_types(cls, is_to=True)) cls._valid_params = {f.name for f in dc.fields(cls)} diff --git a/lightkube/core/exceptions.py b/lightkube/core/exceptions.py index 2b713a3..5c3a2fd 100644 --- a/lightkube/core/exceptions.py +++ b/lightkube/core/exceptions.py @@ -1,6 +1,7 @@ """ Exceptions. """ + import httpx from .internal_models import meta_v1 @@ -10,14 +11,16 @@ class ConfigError(Exception): """ Configuration specific errors. """ + pass class ApiError(httpx.HTTPStatusError): - status: 'meta_v1.Status' + status: "meta_v1.Status" def __init__( - self, request: httpx.Request = None, response: httpx.Response = None) -> None: + self, request: httpx.Request = None, response: httpx.Response = None + ) -> None: self.status = meta_v1.Status.from_dict(response.json()) super().__init__(self.status.message, request=request, response=response) @@ -50,5 +53,5 @@ def __init__(self, name, messages): self.messages = messages def __str__(self): - messages = '; '.join(self.messages) - return f'{self.name} has failure condition(s): {messages}' + messages = "; ".join(self.messages) + return f"{self.name} has failure condition(s): {messages}" diff --git a/lightkube/core/generic_client.py b/lightkube/core/generic_client.py index 648c58c..20a5429 100644 --- a/lightkube/core/generic_client.py +++ b/lightkube/core/generic_client.py @@ -11,32 +11,35 @@ from ..config.kubeconfig import KubeConfig, SingleConfig, DEFAULT_KUBECONFIG from ..config import client_adapter from .exceptions import ApiError -from .selector import build_selector from ..types import OnErrorAction, OnErrorHandler, on_error_raise, PatchType -ALL_NS = '*' +ALL_NS = "*" def transform_exception(e: httpx.HTTPError): - if isinstance(e, httpx.HTTPStatusError) and e.response.headers['Content-Type'] == 'application/json': + if ( + isinstance(e, httpx.HTTPStatusError) + and e.response.headers["Content-Type"] == "application/json" + ): return ApiError(request=e.request, response=e.response) return e METHOD_MAPPING = { - 'delete': 'DELETE', - 'deletecollection': 'DELETE', - 'get': 'GET', - 'global_list': 'GET', - 'global_watch': 'GET', - 'list': 'GET', - 'patch': 'PATCH', - 'post': 'POST', - 'put': 'PUT', - 'watch': 'GET' + "delete": "DELETE", + "deletecollection": "DELETE", + "get": "GET", + "global_list": "GET", + "global_watch": "GET", + "list": "GET", + "patch": "PATCH", + "post": "POST", + "put": "PUT", + "watch": "GET", } + @dataclass class BasicRequest: method: str @@ -49,7 +52,7 @@ class BasicRequest: class WatchDriver: def __init__(self, br: BasicRequest, build_request, lazy): - self._version = br.params.get('resourceVersion') + self._version = br.params.get("resourceVersion") self._convert = br.response_type.from_dict self._br = br self._build_request = build_request @@ -58,14 +61,14 @@ def __init__(self, br: BasicRequest, build_request, lazy): def get_request(self, timeout): br = self._br if self._version is not None: - br.params['resourceVersion'] = self._version + br.params["resourceVersion"] = self._version return self._build_request(br.method, br.url, params=br.params, timeout=timeout) def process_one_line(self, line): line = json.loads(line) - tp = line['type'] - obj = line['object'] - self._version = obj['metadata']['resourceVersion'] + tp = line["type"] + obj = line["object"] + self._version = obj["metadata"]["resourceVersion"] return tp, self._convert(obj, lazy=self._lazy) @@ -99,8 +102,17 @@ def __init__( self._dry_run = dry_run self.namespace = namespace if namespace else config.namespace - def prepare_request(self, method, res: Type[r.Resource] = None, obj=None, name=None, namespace=None, - watch: bool = False, params: dict = None, headers: dict = None) -> BasicRequest: + def prepare_request( + self, + method, + res: Type[r.Resource] = None, + obj=None, + name=None, + namespace=None, + watch: bool = False, + params: dict = None, + headers: dict = None, + ) -> BasicRequest: if params is not None: params = {k: v for k, v in params.items() if v is not None} else: @@ -110,7 +122,9 @@ def prepare_request(self, method, res: Type[r.Resource] = None, obj=None, name=N data = None if res is None: if obj is None: - raise ValueError("At least a resource or an instance of a resource need to be provided") + raise ValueError( + "At least a resource or an instance of a resource need to be provided" + ) res = obj.__class__ namespaced = issubclass(res, (r.NamespacedResource, r.NamespacedSubResource)) @@ -118,8 +132,10 @@ def prepare_request(self, method, res: Type[r.Resource] = None, obj=None, name=N if namespace == ALL_NS: if not issubclass(res, r.NamespacedResourceG): raise ValueError(f"Class {res} doesn't support global {method}") - if method not in ('list', 'watch'): - raise ValueError("Only methods 'list' and 'watch' can be called for all namespaces") + if method not in ("list", "watch"): + raise ValueError( + "Only methods 'list' and 'watch' can be called for all namespaces" + ) real_method = "global_watch" if watch else "global_" + method else: real_method = "watch" if watch else method @@ -129,56 +145,66 @@ def prepare_request(self, method, res: Type[r.Resource] = None, obj=None, name=N if watch: raise ValueError(f"Resource '{res.__name__}' is not watchable") else: - raise ValueError(f"method '{method}' not supported by resource '{res.__name__}'") + raise ValueError( + f"method '{method}' not supported by resource '{res.__name__}'" + ) if watch: - params['watch'] = "true" + params["watch"] = "true" if api_info.parent is None: base = api_info.resource else: base = api_info.parent - if base.group == '': + if base.group == "": path = ["api", base.version] else: path = ["apis", base.group, base.version] if namespaced and namespace != ALL_NS: - if method in ('post', 'put') and obj.metadata.namespace is not None: + if method in ("post", "put") and obj.metadata.namespace is not None: if namespace is None: namespace = obj.metadata.namespace elif namespace != obj.metadata.namespace: - raise ValueError(f"The namespace value '{namespace}' differ from the " - f"namespace in the object metadata '{obj.metadata.namespace}'") + raise ValueError( + f"The namespace value '{namespace}' differ from the " + f"namespace in the object metadata '{obj.metadata.namespace}'" + ) if namespace is None: namespace = self.namespace path.extend(["namespaces", namespace]) - if method in ('post', 'put', 'patch'): - if self._field_manager is not None and 'fieldManager' not in params: - params['fieldManager'] = self._field_manager + if method in ("post", "put", "patch"): + if self._field_manager is not None and "fieldManager" not in params: + params["fieldManager"] = self._field_manager if self._dry_run is True and "dryRun" not in params: params["dryRun"] = "All" - if method == 'patch' and headers['Content-Type'] == PatchType.APPLY.value and 'fieldManager' not in params: - raise ValueError('Parameter "field_manager" is required for PatchType.APPLY') + if ( + method == "patch" + and headers["Content-Type"] == PatchType.APPLY.value + and "fieldManager" not in params + ): + raise ValueError( + 'Parameter "field_manager" is required for PatchType.APPLY' + ) if obj is None: raise ValueError("obj is required for post, put or patch") - if method == 'patch' and not isinstance(obj, r.Resource): + if method == "patch" and not isinstance(obj, r.Resource): data = obj else: data = obj.to_dict() # The following block, ensures that apiVersion and kind are always set. # this is needed as k8s fails if this data are not provided for objects derived by CRDs (Issue #27) - if 'apiVersion' not in data: - data['apiVersion'] = api_info.resource.api_version - if 'kind' not in data: - data['kind'] = api_info.resource.kind + if "apiVersion" not in data: + data["apiVersion"] = api_info.resource.api_version + if "kind" not in data: + data["kind"] = api_info.resource.kind path.append(api_info.plural) - if method in ('delete', 'get', 'patch', 'put') or api_info.action: - if name is None and method == 'put': + if method in ("delete", "get", "patch", "put") or api_info.action: + if name is None and method == "put": name = obj.metadata.name if name is None: raise ValueError("resource name not defined") @@ -188,10 +214,17 @@ def prepare_request(self, method, res: Type[r.Resource] = None, obj=None, name=N path.append(api_info.action) http_method = METHOD_MAPPING[method] - if http_method == 'DELETE': + if http_method == "DELETE": res = None - return BasicRequest(method=http_method, url="/".join(path), params=params, response_type=res, data=data, headers=headers) + return BasicRequest( + method=http_method, + url="/".join(path), + params=params, + response_type=res, + data=data, + headers=headers, + ) @staticmethod def raise_for_status(resp): @@ -201,7 +234,9 @@ def raise_for_status(resp): raise transform_exception(e) def build_adapter_request(self, br: BasicRequest): - return self._client.build_request(br.method, br.url, params=br.params, json=br.data, headers=br.headers) + return self._client.build_request( + br.method, br.url, params=br.params, json=br.data, headers=br.headers + ) def convert_to_resource(self, res: Type[r.Resource], item: dict) -> r.Resource: resource_def = r.api_info(res).resource @@ -216,13 +251,13 @@ def handle_response(self, method, resp, br): # TODO: delete/deletecollection actions normally return a Status object, we may want to return it as well return data = resp.json() - if method == 'list': - if 'metadata' in data and data['metadata'].get('continue'): + if method == "list": + if "metadata" in data and data["metadata"].get("continue"): cont = True - br.params['continue'] = data['metadata']['continue'] + br.params["continue"] = data["metadata"]["continue"] else: cont = False - return cont, (self.convert_to_resource(res, obj) for obj in data['items']) + return cont, (self.convert_to_resource(res, obj) for obj in data["items"]) else: if res is not None: return self.convert_to_resource(res, data) @@ -254,9 +289,20 @@ def watch(self, br: BasicRequest, on_error: OnErrorHandler = on_error_raise): time.sleep(handle_error.sleep) continue - def request(self, method, res: Type[r.Resource] = None, obj=None, name=None, namespace=None, watch: bool = False, - headers: dict = None, params: dict = None) -> Any: - br = self.prepare_request(method, res, obj, name, namespace, watch, headers=headers, params=params) + def request( + self, + method, + res: Type[r.Resource] = None, + obj=None, + name=None, + namespace=None, + watch: bool = False, + headers: dict = None, + params: dict = None, + ) -> Any: + br = self.prepare_request( + method, res, obj, name, namespace, watch, headers=headers, params=params + ) req = self.build_adapter_request(br) resp = self.send(req) return self.handle_response(method, resp, br) @@ -266,7 +312,7 @@ def list(self, br: BasicRequest) -> Any: while cont: req = self.build_adapter_request(br) resp = self.send(req) - cont, chunk = self.handle_response('list', resp, br) + cont, chunk = self.handle_response("list", resp, br) yield from chunk @@ -300,9 +346,20 @@ async def watch(self, br: BasicRequest, on_error: OnErrorHandler = on_error_rais finally: await resp.aclose() - async def request(self, method, res: Type[r.Resource] = None, obj=None, name=None, namespace=None, - watch: bool = False, headers: dict = None, params: dict = None) -> Any: - br = self.prepare_request(method, res, obj, name, namespace, watch, headers=headers, params=params) + async def request( + self, + method, + res: Type[r.Resource] = None, + obj=None, + name=None, + namespace=None, + watch: bool = False, + headers: dict = None, + params: dict = None, + ) -> Any: + br = self.prepare_request( + method, res, obj, name, namespace, watch, headers=headers, params=params + ) req = self.build_adapter_request(br) resp = await self.send(req) return self.handle_response(method, resp, br) @@ -312,7 +369,7 @@ async def list(self, br: BasicRequest) -> Any: while cont: req = self.build_adapter_request(br) resp = await self.send(req) - cont, chunk = self.handle_response('list', resp, br) + cont, chunk = self.handle_response("list", resp, br) for item in chunk: yield item diff --git a/lightkube/core/internal_models.py b/lightkube/core/internal_models.py index 6d37dbe..abae505 100644 --- a/lightkube/core/internal_models.py +++ b/lightkube/core/internal_models.py @@ -4,25 +4,24 @@ from ..models import meta_v1, autoscaling_v1, core_v1 except: - if sys.modules["__main__"].__package__ != 'mkdocs': # we ignore this import error during documentation generation + if ( + sys.modules["__main__"].__package__ != "mkdocs" + ): # we ignore this import error during documentation generation raise from unittest import mock - class ObjectMeta: pass meta_v1 = mock.Mock() meta_v1.ObjectMeta = ObjectMeta - class Scale: pass autoscaling_v1 = mock.Mock() autoscaling_v1.Scale = Scale - class ResourceRequirements: pass diff --git a/lightkube/core/internal_resources.py b/lightkube/core/internal_resources.py index b9ddbe5..ef54fde 100644 --- a/lightkube/core/internal_resources.py +++ b/lightkube/core/internal_resources.py @@ -8,18 +8,18 @@ except: from ..resources import apiextensions_v1beta1 as apiextensions except: - if sys.modules["__main__"].__package__ != 'mkdocs': # we ignore this import error during documentation generation + if ( + sys.modules["__main__"].__package__ != "mkdocs" + ): # we ignore this import error during documentation generation raise from unittest import mock - class CustomResourceDefinition: pass apiextensions = mock.Mock() apiextensions.CustomResourceDefinition = CustomResourceDefinition - class PodLog: pass diff --git a/lightkube/core/resource_registry.py b/lightkube/core/resource_registry.py index 32c9f7b..c966f4c 100644 --- a/lightkube/core/resource_registry.py +++ b/lightkube/core/resource_registry.py @@ -6,7 +6,8 @@ AnyResource = Union[res.NamespacedResource, res.GlobalResource] AnyResourceType = Type[AnyResource] -AnyResourceTypeVar = TypeVar('AnyResourceTypeVar', bound=AnyResourceType) +AnyResourceTypeVar = TypeVar("AnyResourceTypeVar", bound=AnyResourceType) + def _load_internal_resource(version, kind): if "/" in version: @@ -17,13 +18,16 @@ def _load_internal_resource(version, kind): group = group.replace(".", "_") module_name = "_".join([group, version_n]) else: - module_name = f'core_{version}' + module_name = f"core_{version}" - module = importlib.import_module(f'lightkube.resources.{module_name.lower()}') + module = importlib.import_module(f"lightkube.resources.{module_name.lower()}") try: return getattr(module, kind) except AttributeError: - raise LoadResourceError(f"Cannot find resource kind '{kind}' in module {module.__name__}") + raise LoadResourceError( + f"Cannot find resource kind '{kind}' in module {module.__name__}" + ) + def _maybe_internal(version): if "/" not in version: @@ -35,8 +39,8 @@ def _maybe_internal(version): class ResourceRegistry: - """Resource Registry used to load standard resources or to register custom resources - """ + """Resource Registry used to load standard resources or to register custom resources""" + _registry: dict def __init__(self): @@ -52,21 +56,26 @@ def register(self, resource: AnyResourceTypeVar) -> AnyResourceTypeVar: **returns** The `resource` class provided """ info = resource._api_info - version = f'{info.resource.group}/{info.resource.version}' if info.resource.group else info.resource.version + version = ( + f"{info.resource.group}/{info.resource.version}" + if info.resource.group + else info.resource.version + ) res_key = (version, info.resource.kind) if res_key in self._registry: registered_resource = self._registry[res_key] - if registered_resource is resource: # already present + if registered_resource is resource: # already present return registered_resource - raise ValueError(f"Another class for resource '{info.resource.kind}' is already registered") + raise ValueError( + f"Another class for resource '{info.resource.kind}' is already registered" + ) self._registry[res_key] = resource return resource def clear(self): - """Clear the registry from all registered resources - """ + """Clear the registry from all registered resources""" self._registry.clear() def get(self, version: str, kind: str) -> Optional[Type[AnyResource]]: @@ -102,7 +111,10 @@ def load(self, version, kind) -> Optional[Type[AnyResource]]: except ImportError: pass - raise LoadResourceError(f"Cannot find resource {kind} of group {version}. " - "If using a CRD, ensure a generic resource is defined.") + raise LoadResourceError( + f"Cannot find resource {kind} of group {version}. " + "If using a CRD, ensure a generic resource is defined." + ) + resource_registry = ResourceRegistry() diff --git a/lightkube/core/schema.py b/lightkube/core/schema.py index a260518..47bda46 100644 --- a/lightkube/core/schema.py +++ b/lightkube/core/schema.py @@ -3,6 +3,5 @@ These dependencies are here because we may decide to replace dataclasses with something else in the future """ -from dataclasses import dataclass, field -from .dataclasses_dict import DataclassDictMixIn as DictMixin + diff --git a/lightkube/core/selector.py b/lightkube/core/selector.py index 97933c5..8f4e73a 100644 --- a/lightkube/core/selector.py +++ b/lightkube/core/selector.py @@ -2,8 +2,8 @@ from collections.abc import Iterable from lightkube import operators -FIELDS_SUPPORT = ('equal', 'not_equal', 'not_in') -FIELDS_SUPPORT_STR = ', '.join(f'"{fs}"' for fs in FIELDS_SUPPORT) +FIELDS_SUPPORT = ("equal", "not_equal", "not_in") +FIELDS_SUPPORT_STR = ", ".join(f'"{fs}"' for fs in FIELDS_SUPPORT) def build_selector(pairs: Union[List, Dict], for_fields=False): @@ -19,14 +19,20 @@ def build_selector(pairs: Union[List, Dict], for_fields=False): v = operators.in_(v) if not isinstance(v, operators.Operator): - raise ValueError(f"selector value '{v}' should be str, None, Iterable or instance of operator") + raise ValueError( + f"selector value '{v}' should be str, None, Iterable or instance of operator" + ) if for_fields and v.op_name not in FIELDS_SUPPORT: - raise ValueError(f"parameter 'fields' only support operators {FIELDS_SUPPORT_STR}") + raise ValueError( + f"parameter 'fields' only support operators {FIELDS_SUPPORT_STR}" + ) - if for_fields and v.op_name == 'not_in': # not_in can be implement using several != + if ( + for_fields and v.op_name == "not_in" + ): # not_in can be implement using several != for item in v.value: res.append(operators.not_equal(item).encode(k)) else: res.append(v.encode(k)) - return ','.join(res) + return ",".join(res) diff --git a/lightkube/core/sort_objects.py b/lightkube/core/sort_objects.py index 2783ba0..84ca3dd 100644 --- a/lightkube/core/sort_objects.py +++ b/lightkube/core/sort_objects.py @@ -1,9 +1,10 @@ -from collections import defaultdict from typing import List from ..core import resource as r -def sort_objects(objs: List[r.Resource], by: str = "kind", reverse: bool = False) -> List[r.Resource]: +def sort_objects( + objs: List[r.Resource], by: str = "kind", reverse: bool = False +) -> List[r.Resource]: """Sorts a list of resource objects by a sorting schema, returning a new list **parameters** diff --git a/lightkube/core/typing_extra.py b/lightkube/core/typing_extra.py index 0dbfe84..b427798 100644 --- a/lightkube/core/typing_extra.py +++ b/lightkube/core/typing_extra.py @@ -1,4 +1,5 @@ """This module provides a compatibility layer for functions in typing that appeared on 3.8""" + import collections.abc import sys @@ -6,14 +7,14 @@ from typing import get_args, get_origin else: + def get_origin(tp): - if hasattr(tp, '__origin__'): + if hasattr(tp, "__origin__"): return tp.__origin__ return None - def get_args(tp): - if hasattr(tp, '__args__'): + if hasattr(tp, "__args__"): res = tp.__args__ if get_origin(tp) is collections.abc.Callable and res[0] is not Ellipsis: res = (list(res[:-1]), res[-1]) diff --git a/lightkube/generic_resource.py b/lightkube/generic_resource.py index 6f5a6d4..6d349a1 100644 --- a/lightkube/generic_resource.py +++ b/lightkube/generic_resource.py @@ -9,12 +9,12 @@ __all__ = [ - 'async_load_in_cluster_generic_resources', - 'create_global_resource', - 'create_namespaced_resource', - 'create_resources_from_crd', - 'get_generic_resource', - 'load_in_cluster_generic_resources', + "async_load_in_cluster_generic_resources", + "create_global_resource", + "create_namespaced_resource", + "create_resources_from_crd", + "get_generic_resource", + "load_in_cluster_generic_resources", ] @@ -30,15 +30,22 @@ def get_generic_resource(version, kind): **returns** class representing the generic resource or `None` if it's not found """ resource = resource_registry.get(version, kind) - if resource is None or not issubclass(resource, (GenericGlobalResource, GenericNamespacedResource)): + if resource is None or not issubclass( + resource, (GenericGlobalResource, GenericNamespacedResource) + ): return None return resource class Generic(dict): @overload - def __init__(self, apiVersion: str=None, kind: str=None, - metadata: meta_v1.ObjectMeta=None, **kwargs): + def __init__( + self, + apiVersion: str = None, + kind: str = None, + metadata: meta_v1.ObjectMeta = None, + **kwargs, + ): pass def __init__(self, *args, **kwargs): @@ -46,19 +53,19 @@ def __init__(self, *args, **kwargs): @property def apiVersion(self) -> str: - return self.get('apiVersion') + return self.get("apiVersion") @property def kind(self) -> str: - return self.get('kind') + return self.get("kind") @property def status(self) -> str: - return self.get('status') + return self.get("status") @property def metadata(self) -> Optional[meta_v1.ObjectMeta]: - meta = self.get('metadata') + meta = self.get("metadata") if meta is None: return None elif isinstance(meta, meta_v1.ObjectMeta): @@ -76,19 +83,27 @@ def from_dict(cls, d: dict, lazy=True): def to_dict(self, dict_factory=dict): d = dict_factory(self) - if 'metadata' in d and isinstance(d['metadata'], meta_v1.ObjectMeta): - d['metadata'] = d['metadata'].to_dict(dict_factory) + if "metadata" in d and isinstance(d["metadata"], meta_v1.ObjectMeta): + d["metadata"] = d["metadata"].to_dict(dict_factory) return d def create_api_info(group, version, kind, plural, verbs=None) -> res.ApiInfo: if verbs is None: - verbs = ['delete', 'deletecollection', 'get', 'global_list', 'global_watch', 'list', 'patch', - 'post', 'put', 'watch'] + verbs = [ + "delete", + "deletecollection", + "get", + "global_list", + "global_watch", + "list", + "patch", + "post", + "put", + "watch", + ] return res.ApiInfo( - resource=res.ResourceDef(group, version, kind), - plural=plural, - verbs=verbs + resource=res.ResourceDef(group, version, kind), plural=plural, verbs=verbs ) @@ -111,14 +126,20 @@ class GenericNamespacedStatus(res.NamespacedResourceG, Generic): def _create_subresource(main_class, parent_info: res.ApiInfo, action): class TmpName(main_class): _api_info = res.ApiInfo( - resource=parent_info.resource if action == 'status' else res.ResourceDef('autoscaling', 'v1', 'Scale'), + resource=( + parent_info.resource + if action == "status" + else res.ResourceDef("autoscaling", "v1", "Scale") + ), parent=parent_info.resource, plural=parent_info.plural, - verbs=['get', 'patch', 'put'], + verbs=["get", "patch", "put"], action=action, ) - TmpName.__name__ = TmpName.__qualname__ = f"{parent_info.resource.kind}{action.capitalize()}" + TmpName.__name__ = TmpName.__qualname__ = ( + f"{parent_info.resource.kind}{action.capitalize()}" + ) return TmpName @@ -133,11 +154,15 @@ class GenericNamespacedResource(res.NamespacedResourceG, Generic): def _api_info_signature(api_info: res.ApiInfo, namespaced: bool): - return (namespaced, api_info.plural, tuple(api_info.verbs) if api_info.verbs else None) + return ( + namespaced, + api_info.plural, + tuple(api_info.verbs) if api_info.verbs else None, + ) def _create_resource(namespaced, group, version, kind, plural, verbs=None) -> Any: - model = resource_registry.get(f'{group}/{version}', kind) + model = resource_registry.get(f"{group}/{version}", kind) api_info = create_api_info(group, version, kind, plural, verbs=verbs) signature = _api_info_signature(api_info, namespaced) @@ -145,26 +170,37 @@ def _create_resource(namespaced, group, version, kind, plural, verbs=None) -> An curr_namespaced = issubclass(model, res.NamespacedResource) curr_signature = _api_info_signature(model._api_info, curr_namespaced) if curr_signature != signature: - raise ValueError(f"Resource {kind} already created but with different signature") + raise ValueError( + f"Resource {kind} already created but with different signature" + ) return model if namespaced: - main, status, scale = GenericNamespacedResource, GenericNamespacedStatus, GenericNamespacedScale + main, status, scale = ( + GenericNamespacedResource, + GenericNamespacedStatus, + GenericNamespacedScale, + ) else: - main, status, scale = GenericGlobalResource, GenericGlobalStatus, GenericGlobalScale + main, status, scale = ( + GenericGlobalResource, + GenericGlobalStatus, + GenericGlobalScale, + ) class TmpName(main): _api_info = create_api_info(group, version, kind, plural, verbs=verbs) - Scale = _create_subresource(scale, _api_info, action='scale') - Status = _create_subresource(status, _api_info, action='status') + Scale = _create_subresource(scale, _api_info, action="scale") + Status = _create_subresource(status, _api_info, action="status") TmpName.__name__ = TmpName.__qualname__ = kind return resource_registry.register(TmpName) -def create_global_resource(group: str, version: str, kind: str, plural: str, verbs=None) \ - -> Type[GenericGlobalResource]: +def create_global_resource( + group: str, version: str, kind: str, plural: str, verbs=None +) -> Type[GenericGlobalResource]: """Create a new class representing a global resource with the provided specifications. **Parameters** @@ -176,12 +212,12 @@ def create_global_resource(group: str, version: str, kind: str, plural: str, ver **returns** Subclass of `GenericGlobalResource`. """ - return _create_resource( - False, group, version, kind, plural, verbs=verbs) + return _create_resource(False, group, version, kind, plural, verbs=verbs) -def create_namespaced_resource(group: str, version: str, kind: str, plural: str, verbs=None) \ - -> Type[GenericNamespacedResource]: +def create_namespaced_resource( + group: str, version: str, kind: str, plural: str, verbs=None +) -> Type[GenericNamespacedResource]: """Create a new class representing a namespaced resource with the provided specifications. **Parameters** @@ -193,8 +229,7 @@ def create_namespaced_resource(group: str, version: str, kind: str, plural: str, **returns** Subclass of `GenericNamespacedResource`. """ - return _create_resource( - True, group, version, kind, plural, verbs=verbs) + return _create_resource(True, group, version, kind, plural, verbs=verbs) def load_in_cluster_generic_resources(client: Client): diff --git a/lightkube/operators.py b/lightkube/operators.py index f728666..aa02ff9 100644 --- a/lightkube/operators.py +++ b/lightkube/operators.py @@ -1,6 +1,6 @@ from typing import Iterable -__all__ = ['in_', 'not_in', 'exists', 'not_exists', 'equal', 'not_equal'] +__all__ = ["in_", "not_in", "exists", "not_exists", "equal", "not_equal"] class Operator: @@ -28,25 +28,24 @@ def encode(self, key): def in_(values: Iterable) -> SequenceOperator: - return SequenceOperator('in_', 'in', sorted(values)) + return SequenceOperator("in_", "in", sorted(values)) def not_in(values: Iterable) -> SequenceOperator: - return SequenceOperator('not_in', 'notin', sorted(values)) + return SequenceOperator("not_in", "notin", sorted(values)) def exists() -> UnaryOperator: - return UnaryOperator('exists', '') + return UnaryOperator("exists", "") def not_exists() -> UnaryOperator: - return UnaryOperator('not_exists', '!') + return UnaryOperator("not_exists", "!") def equal(value: str) -> BinaryOperator: - return BinaryOperator('equal', '=', value) + return BinaryOperator("equal", "=", value) def not_equal(value: str) -> BinaryOperator: - return BinaryOperator('not_equal', '!=', value) - + return BinaryOperator("not_equal", "!=", value) diff --git a/lightkube/types.py b/lightkube/types.py index 1aa28e4..cde9540 100644 --- a/lightkube/types.py +++ b/lightkube/types.py @@ -4,20 +4,22 @@ class PatchType(enum.Enum): - JSON = 'application/json-patch+json' - MERGE = 'application/merge-patch+json' - STRATEGIC = 'application/strategic-merge-patch+json' - APPLY = 'application/apply-patch+yaml' + JSON = "application/json-patch+json" + MERGE = "application/merge-patch+json" + STRATEGIC = "application/strategic-merge-patch+json" + APPLY = "application/apply-patch+yaml" + class CascadeType(enum.Enum): - ORPHAN = 'Orphan' - BACKGROUND = 'Background' - FOREGROUND = 'Foreground' + ORPHAN = "Orphan" + BACKGROUND = "Background" + FOREGROUND = "Foreground" + class OnErrorAction(enum.Enum): - RETRY = 0 # retry to perform the API call again from the last version - STOP = 1 # stop silently the iterator - RAISE = 2 # raise the error on the caller scope + RETRY = 0 # retry to perform the API call again from the last version + STOP = 1 # stop silently the iterator + RAISE = 2 # raise the error on the caller scope @dataclass diff --git a/lightkube/utils/quantity.py b/lightkube/utils/quantity.py index dafdadb..a84821e 100644 --- a/lightkube/utils/quantity.py +++ b/lightkube/utils/quantity.py @@ -18,7 +18,6 @@ "E": (10, 18), # 1000^6 "Z": (10, 21), # 1000^7 "Y": (10, 24), # 1000^8 - # Bibytes "Ki": (1024, 1), # 2^10 "Mi": (1024, 2), # 2^20 @@ -31,7 +30,7 @@ } # Pre-calculate multipliers and store as decimals. -MULTIPLIERS = {k: decimal.Decimal(v[0])**v[1] for k, v in MULTIPLIERS.items()} +MULTIPLIERS = {k: decimal.Decimal(v[0]) ** v[1] for k, v in MULTIPLIERS.items()} def parse_quantity(quantity: Optional[str]) -> Optional[decimal.Decimal]: @@ -81,7 +80,9 @@ def parse_quantity(quantity: Optional[str]) -> Optional[decimal.Decimal]: raise ValueError("Invalid numerical value") from e -def _equals_canonically(first_dict: Optional[dict], second_dict: Optional[dict]) -> bool: +def _equals_canonically( + first_dict: Optional[dict], second_dict: Optional[dict] +) -> bool: """Compare resource dicts such as 'limits' or 'requests'.""" if first_dict == second_dict: # This covers two cases: (1) both args are None; (2) both args are identical dicts. @@ -101,13 +102,13 @@ def _equals_canonically(first_dict: Optional[dict], second_dict: Optional[dict]) @overload -def equals_canonically(first: ResourceRequirements, second: ResourceRequirements) -> bool: - ... +def equals_canonically( + first: ResourceRequirements, second: ResourceRequirements +) -> bool: ... @overload -def equals_canonically(first: Optional[dict], second: Optional[dict]) -> bool: - ... +def equals_canonically(first: Optional[dict], second: Optional[dict]) -> bool: ... def equals_canonically(first, second): @@ -139,11 +140,18 @@ def equals_canonically(first, second): if isinstance(first, (dict, type(None))) and isinstance(second, (dict, type(None))): # Args are 'limits' or 'requests' dicts return _equals_canonically(first, second) - elif isinstance(first, ResourceRequirements) and isinstance(second, ResourceRequirements): + elif isinstance(first, ResourceRequirements) and isinstance( + second, ResourceRequirements + ): # Args are ResourceRequirements, which may contain 'limits' and 'requests' dicts ks = ("limits", "requests") - return all(_equals_canonically(getattr(first, k), getattr(second, k)) for k in ks) + return all( + _equals_canonically(getattr(first, k), getattr(second, k)) for k in ks + ) else: - raise TypeError("unsupported operand type(s) for canonical comparison: '{}' and '{}'".format( - first.__class__.__name__, second.__class__.__name__, - )) + raise TypeError( + "unsupported operand type(s) for canonical comparison: '{}' and '{}'".format( + first.__class__.__name__, + second.__class__.__name__, + ) + )