Skip to content

Commit

Permalink
linting
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeldmitry authored and gtsystem committed Aug 25, 2024
1 parent e0279cb commit e324b96
Show file tree
Hide file tree
Showing 20 changed files with 835 additions and 393 deletions.
43 changes: 28 additions & 15 deletions lightkube/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@

import yaml

from .generic_resource import GenericGlobalResource, GenericNamespacedResource, create_resources_from_crd
from .generic_resource import (
GenericGlobalResource,
GenericNamespacedResource,
create_resources_from_crd,
)
from .core.exceptions import LoadResourceError
from .core.resource_registry import resource_registry

__all__ = [
'from_dict',
'load_all_yaml',
'dump_all_yaml',
'resource_registry'
]
__all__ = ["from_dict", "load_all_yaml", "dump_all_yaml", "resource_registry"]

try:
import jinja2
except ImportError:
jinja2 = None

REQUIRED_ATTR = ('apiVersion', 'kind')
REQUIRED_ATTR = ("apiVersion", "kind")

AnyResource = Union[GenericGlobalResource, GenericNamespacedResource]

Expand All @@ -35,16 +34,23 @@ def from_dict(d: dict) -> AnyResource:
always required.
"""
if not isinstance(d, Mapping):
raise LoadResourceError(f"Invalid resource definition, not a dict.")
raise LoadResourceError("Invalid resource definition, not a dict.")
for attr in REQUIRED_ATTR:
if attr not in d:
raise LoadResourceError(f"Invalid resource definition, key '{attr}' missing.")
raise LoadResourceError(
f"Invalid resource definition, key '{attr}' missing."
)

model = resource_registry.load(d['apiVersion'], d['kind'])
model = resource_registry.load(d["apiVersion"], d["kind"])
return model.from_dict(d)


def load_all_yaml(stream: Union[str, TextIO], context: dict = None, template_env = None, create_resources_for_crds: bool = False) -> List[AnyResource]:
def load_all_yaml(
stream: Union[str, TextIO],
context: dict = None,
template_env=None,
create_resources_for_crds: bool = False,
) -> List[AnyResource]:
"""Load kubernetes resource objects defined as YAML. See `from_dict` regarding how resource types are detected.
Returns a list of resource objects or raise a `LoadResourceError`. Skips any empty YAML documents in the
stream, returning an empty list if all YAML documents are empty. Deep parse any items from .*List resources.
Expand Down Expand Up @@ -80,7 +86,10 @@ def _flatten(objects: Iterator) -> List[AnyResource]:
res = from_dict(obj)
resources.append(res)

if create_resources_for_crds is True and res.kind == "CustomResourceDefinition":
if (
create_resources_for_crds is True
and res.kind == "CustomResourceDefinition"
):
create_resources_from_crd(res)
return resources

Expand All @@ -101,7 +110,9 @@ def dump_all_yaml(resources: List[AnyResource], stream: TextIO = None, indent=2)
return yaml.safe_dump_all(res, stream, indent=indent)


def _template(stream: Union[str, TextIO], context: dict = None, template_env = None) -> List[AnyResource]:
def _template(
stream: Union[str, TextIO], context: dict = None, template_env=None
) -> List[AnyResource]:
"""
Template a stream using jinja2 and the given context
"""
Expand All @@ -113,5 +124,7 @@ def _template(stream: Union[str, TextIO], context: dict = None, template_env = N
elif not isinstance(template_env, jinja2.Environment):
raise LoadResourceError("template_env is not a valid jinja2 template")

tmpl = template_env.from_string(stream if isinstance(stream, str) else stream.read())
tmpl = template_env.from_string(
stream if isinstance(stream, str) else stream.read()
)
return tmpl.render(**context)
35 changes: 26 additions & 9 deletions lightkube/config/client_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
from ..core.exceptions import ConfigError


def Client(config: SingleConfig, timeout: httpx.Timeout, trust_env: bool = True) -> httpx.Client:
def Client(
config: SingleConfig, timeout: httpx.Timeout, trust_env: bool = True
) -> httpx.Client:
return httpx.Client(**httpx_parameters(config, timeout, trust_env))


def AsyncClient(config: SingleConfig, timeout: httpx.Timeout, trust_env: bool = True) -> httpx.AsyncClient:
def AsyncClient(
config: SingleConfig, timeout: httpx.Timeout, trust_env: bool = True
) -> httpx.AsyncClient:
return httpx.AsyncClient(**httpx_parameters(config, timeout, trust_env))


Expand All @@ -41,18 +45,26 @@ def auth_flow(self, request: httpx.Request):

async def async_check_output(command, env):
PIPE = asyncio.subprocess.PIPE
proc = await asyncio.create_subprocess_exec(*command, env=env, stdin=None, stdout=PIPE, stderr=PIPE)
proc = await asyncio.create_subprocess_exec(
*command, env=env, stdin=None, stdout=PIPE, stderr=PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise ConfigError(f"Exec {command[0]} returned {proc.returncode}: {stderr.decode()}")
raise ConfigError(
f"Exec {command[0]} returned {proc.returncode}: {stderr.decode()}"
)
return stdout


def sync_check_output(command, env):
proc = subprocess.Popen(command, env=env, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc = subprocess.Popen(
command, env=env, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise ConfigError(f"Exec {command[0]} returned {proc.returncode}: {stderr.decode()}")
raise ConfigError(
f"Exec {command[0]} returned {proc.returncode}: {stderr.decode()}"
)
return stdout


Expand All @@ -63,8 +75,13 @@ def __init__(self, exec: UserExec):

def _prepare(self):
exec = self._exec
if exec.apiVersion not in ("client.authentication.k8s.io/v1alpha1", "client.authentication.k8s.io/v1beta1"):
raise ConfigError(f"auth exec api version {exec.apiVersion} not implemented")
if exec.apiVersion not in (
"client.authentication.k8s.io/v1alpha1",
"client.authentication.k8s.io/v1beta1",
):
raise ConfigError(
f"auth exec api version {exec.apiVersion} not implemented"
)
cmd_env_vars = dict(os.environ)
cmd_env_vars.update((var.name, var.value) for var in exec.env)
# TODO: add support for passing KUBERNETES_EXEC_INFO env var
Expand Down Expand Up @@ -121,7 +138,7 @@ def user_cert(user: User, abs_file):
if user.client_cert or user.client_cert_data:
return (
FileStr(user.client_cert_data) or abs_file(user.client_cert),
FileStr(user.client_key_data) or abs_file(user.client_key)
FileStr(user.client_key_data) or abs_file(user.client_key),
)
return None

Expand Down
84 changes: 50 additions & 34 deletions lightkube/config/kubeconfig.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import yaml
import tempfile
import base64
from typing import Dict, NamedTuple, Optional
from pathlib import Path

Expand All @@ -21,7 +19,7 @@


def to_mapping(obj_list, key, factory):
return {obj['name']: factory.from_dict(obj[key], lazy=False) for obj in obj_list}
return {obj["name"]: factory.from_dict(obj[key], lazy=False) for obj in obj_list}


DEFAULT_NAMESPACE = "default"
Expand All @@ -45,24 +43,30 @@ def abs_file(self, fname):
return fname

if self.fname is None:
raise exceptions.ConfigError(f"{fname} is relative, but kubeconfig path unknown")
raise exceptions.ConfigError(
f"{fname} is relative, but kubeconfig path unknown"
)

return self.fname.parent.joinpath(fname)


PROXY_CONF = SingleConfig(
context_name="default", context=Context(cluster="default"),
cluster=Cluster(server="http://localhost:8080")
context_name="default",
context=Context(cluster="default"),
cluster=Cluster(server="http://localhost:8080"),
)


class KubeConfig:
"""Class to represent a kubeconfig. See the specific constructors depending on your use case."""

clusters: Dict[str, Cluster]
users: Dict[str, User]
contexts: Dict[str, Context]

def __init__(self, *, clusters, contexts, users=None, current_context=None, fname=None):
def __init__(
self, *, clusters, contexts, users=None, current_context=None, fname=None
):
"""
Create the kubernetes configuration manually. Normally this constructor should not be called directly.
Use a specific constructor instead.
Expand Down Expand Up @@ -92,14 +96,16 @@ def from_dict(cls, conf: Dict, fname=None):
present inside the configuration.
"""
return cls(
current_context=conf.get('current-context'),
clusters=to_mapping(conf['clusters'], 'cluster', factory=Cluster),
contexts=to_mapping(conf['contexts'], 'context', factory=Context),
users=to_mapping(conf.get('users', []), 'user', factory=User),
fname=fname
current_context=conf.get("current-context"),
clusters=to_mapping(conf["clusters"], "cluster", factory=Cluster),
contexts=to_mapping(conf["contexts"], "context", factory=Context),
users=to_mapping(conf.get("users", []), "user", factory=User),
fname=fname,
)

def get(self, context_name=None, default: SingleConfig = None) -> Optional[SingleConfig]:
def get(
self, context_name=None, default: SingleConfig = None
) -> Optional[SingleConfig]:
"""Returns a `SingleConfig` instance, representing the configuration matching the given `context_name`.
Lightkube client will automatically call this method without parameters when an instance of `KubeConfig`
is provided.
Expand All @@ -116,17 +122,20 @@ def get(self, context_name=None, default: SingleConfig = None) -> Optional[Singl
context_name = self.current_context
if context_name is None:
if default is None:
raise exceptions.ConfigError("No current context set and no default provided")
raise exceptions.ConfigError(
"No current context set and no default provided"
)
return default
try:
ctx = self.contexts[context_name]
except KeyError:
raise exceptions.ConfigError(f"Context '{context_name}' not found")
return SingleConfig(
context_name=context_name, context=ctx,
context_name=context_name,
context=ctx,
cluster=self.clusters[ctx.cluster],
user=self.users[ctx.user] if ctx.user else None,
fname=self.fname
fname=self.fname,
)

@classmethod
Expand All @@ -144,15 +153,21 @@ def from_file(cls, fname):
return cls.from_dict(yaml.safe_load(f.read()), fname=filepath)

@classmethod
def from_one(cls, *, cluster, user=None, context_name='default', namespace=None, fname=None):
def from_one(
cls, *, cluster, user=None, context_name="default", namespace=None, fname=None
):
"""Creates an instance of the KubeConfig class from one cluster and one user configuration"""
context = Context(cluster=context_name, user=context_name if user else None, namespace=namespace)
context = Context(
cluster=context_name,
user=context_name if user else None,
namespace=namespace,
)
return cls(
clusters={context_name: cluster},
contexts={context_name: context},
users={context_name: user} if user else None,
current_context=context_name,
fname=fname
fname=fname,
)

@classmethod
Expand All @@ -179,35 +194,36 @@ def from_service_account(cls, service_account=SERVICE_ACCOUNT):

host = os.environ["KUBERNETES_SERVICE_HOST"]
port = os.environ["KUBERNETES_SERVICE_PORT"]
if ":" in host: # ipv6
if ":" in host: # ipv6
host = f"[{host}]"
return cls.from_one(
cluster=Cluster(
server=f"https://{host}:{port}",
certificate_auth=str(account_dir.joinpath("ca.crt"))
certificate_auth=str(account_dir.joinpath("ca.crt")),
),
user=User(token=token),
namespace=namespace
namespace=namespace,
)

@classmethod
def from_env(cls, service_account=SERVICE_ACCOUNT, default_config=DEFAULT_KUBECONFIG):
def from_env(
cls, service_account=SERVICE_ACCOUNT, default_config=DEFAULT_KUBECONFIG
):
"""Attempts to load the configuration automatically looking at the environment and filesystem.
The method will attempt to load a configuration using the following order:
The method will attempt to load a configuration using the following order:
* in-cluster config.
* config file defined in `KUBECONFIG` environment variable.
* configuration file present on the default location.
* in-cluster config.
* config file defined in `KUBECONFIG` environment variable.
* configuration file present on the default location.
**Parameters**
**Parameters**
* **service_account**: Allows to override the default service account directory path.
Default `/var/run/secrets/kubernetes.io/serviceaccount`.
* **default_config**: Allows to override the default configuration location. Default `~/.kube/config`.
"""
* **service_account**: Allows to override the default service account directory path.
Default `/var/run/secrets/kubernetes.io/serviceaccount`.
* **default_config**: Allows to override the default configuration location. Default `~/.kube/config`.
"""
try:
return KubeConfig.from_service_account(service_account=service_account)
except exceptions.ConfigError:
return KubeConfig.from_file(os.environ.get('KUBECONFIG', default_config))

return KubeConfig.from_file(os.environ.get("KUBECONFIG", default_config))
22 changes: 14 additions & 8 deletions lightkube/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,22 @@ class User(DataclassDictMixIn):
username: str = None
password: str = None
token: str = None
auth_provider: Dict = field(metadata={'json': 'auth-provider'}, default=None)
client_cert: str = field(metadata={'json': 'client-certificate'}, default=None)
client_cert_data: str = field(metadata={'json': 'client-certificate-data'}, default=None)
client_key: str = field(metadata={'json': 'client-key'}, default=None)
client_key_data: str = field(metadata={'json': 'client-key-data'}, default=None)
auth_provider: Dict = field(metadata={"json": "auth-provider"}, default=None)
client_cert: str = field(metadata={"json": "client-certificate"}, default=None)
client_cert_data: str = field(
metadata={"json": "client-certificate-data"}, default=None
)
client_key: str = field(metadata={"json": "client-key"}, default=None)
client_key_data: str = field(metadata={"json": "client-key-data"}, default=None)


@dataclass
class Cluster(DataclassDictMixIn):
server: str = "http://localhost:8080"
certificate_auth: str = field(metadata={'json': 'certificate-authority'}, default=None)
certificate_auth_data: str = field(metadata={'json': 'certificate-authority-data'}, default=None)
insecure: bool = field(metadata={'json': 'insecure-skip-tls-verify'}, default=False)
certificate_auth: str = field(
metadata={"json": "certificate-authority"}, default=None
)
certificate_auth_data: str = field(
metadata={"json": "certificate-authority-data"}, default=None
)
insecure: bool = field(metadata={"json": "insecure-skip-tls-verify"}, default=False)
Loading

0 comments on commit e324b96

Please sign in to comment.