diff --git a/flux_local/manifest.py b/flux_local/manifest.py index 20f7ae3e..9cfe7e20 100644 --- a/flux_local/manifest.py +++ b/flux_local/manifest.py @@ -24,6 +24,21 @@ "ManifestException", ] +# Match a prefix of apiVersion to ensure we have the right type of object. +# We don't check specific versions for forward compatibility on upgrade. +CLUSTER_KUSTOMIZE_DOMAIN = "kustomize.toolkit.fluxcd.io" +KUSTOMIZE_DOMAIN = "kustomize.toolkit.fluxcd.io" +HELM_REPO_DOMAIN = "source.toolkit.fluxcd.io" +HELM_RELEASE_DOMAIN = "helm.toolkit.fluxcd.io" + + +def _check_version(doc: dict[str, Any], version: str) -> None: + """Assert that the resource has the specified version.""" + if not (api_version := doc.get("apiVersion")): + raise ValueError(f"Invalid object missing apiVersion: {doc}") + if not api_version.startswith(version): + raise ValueError(f"Invalid object expected '{version}': {doc}") + class HelmChart(BaseModel): """A representation of an instantiation of a chart for a HelmRelease.""" @@ -31,7 +46,7 @@ class HelmChart(BaseModel): name: str """The name of the chart within the HelmRepository.""" - version: str + version: Optional[str] = None """The version of the chart.""" repo_name: str @@ -43,6 +58,7 @@ class HelmChart(BaseModel): @classmethod def from_doc(cls, doc: dict[str, Any]) -> "HelmChart": """Parse a HelmChart from a HelmRelease resource object.""" + _check_version(doc, HELM_RELEASE_DOMAIN) if not (spec := doc.get("spec")): raise ValueError(f"Invalid {cls} missing spec: {doc}") if not (chart := spec.get("chart")): @@ -88,6 +104,7 @@ class HelmRelease(BaseModel): @classmethod def from_doc(cls, doc: dict[str, Any]) -> "HelmRelease": """Parse a HelmRelease from a kubernetes resource object.""" + _check_version(doc, HELM_RELEASE_DOMAIN) if not (metadata := doc.get("metadata")): raise ValueError(f"Invalid {cls} missing metadata: {doc}") if not (name := metadata.get("name")): @@ -123,6 +140,7 @@ class HelmRepository(BaseModel): @classmethod def from_doc(cls, doc: dict[str, Any]) -> "HelmRepository": """Parse a HelmRepository from a kubernetes resource.""" + _check_version(doc, HELM_REPO_DOMAIN) if not (metadata := doc.get("metadata")): raise ValueError(f"Invalid {cls} missing metadata: {doc}") if not (name := metadata.get("name")): @@ -164,6 +182,7 @@ class Kustomization(BaseModel): @classmethod def from_doc(cls, doc: dict[str, Any]) -> "Kustomization": """Parse a partial Kustomization from a kubernetes resource.""" + _check_version(doc, KUSTOMIZE_DOMAIN) if not (metadata := doc.get("metadata")): raise ValueError(f"Invalid {cls} missing metadata: {doc}") if not (name := metadata.get("name")): @@ -199,6 +218,7 @@ class Cluster(BaseModel): @classmethod def from_doc(cls, doc: dict[str, Any]) -> "Cluster": """Parse a partial Kustomization from a kubernetes resource.""" + _check_version(doc, CLUSTER_KUSTOMIZE_DOMAIN) if not (metadata := doc.get("metadata")): raise ValueError(f"Invalid {cls} missing metadata: {doc}") if not (name := metadata.get("name")): @@ -225,9 +245,7 @@ class Manifest(BaseModel): def parse_yaml(content: str) -> "Manifest": """Parse a serialized manifest.""" doc = next(yaml.load_all(content, Loader=yaml.Loader), None) - if not doc or "spec" not in doc: - raise ManifestException("Manifest file malformed, missing 'spec'") - return Manifest(clusters=doc["spec"]) + return Manifest.parse_obj(doc) def yaml(self) -> str: """Serialize the manifest as a yaml file. @@ -258,7 +276,7 @@ def yaml(self) -> str: ) return cast( str, - yaml.dump({"spec": data["clusters"]}, sort_keys=False, explicit_start=True), + yaml.dump(data, sort_keys=False, explicit_start=True), ) diff --git a/flux_local/repo.py b/flux_local/repo.py index 5af069d0..d27c59d0 100644 --- a/flux_local/repo.py +++ b/flux_local/repo.py @@ -1,26 +1,125 @@ -"""Library for operating on a local repo.""" +"""Library for operating on a local repo and building Manifests. +This will build a `manifest.Manifest` from a cluster repo. This follows the +pattern of building kustomizations, then reading helm releases (though it will +not evaluate the templates). The resulting `Manifest` contains all the necessary +information to do basic checks on objects in the cluster (e.g. run templates +from unit tests). + +Example usage: +``` +from flux_local import repo + +manifest = await repo.build_manifest() +for cluster in manifest: + print(f"Found cluster: {cluster.path}") + for kustomization in cluster.kustomizations: + print(f"Found kustomization: {kustomization.path}") +""" + +import logging import os +from collections.abc import Callable from functools import cache from pathlib import Path +from typing import Any import git -import yaml -from .manifest import Manifest +from . import kustomize +from .manifest import ( + CLUSTER_KUSTOMIZE_DOMAIN, + KUSTOMIZE_DOMAIN, + Cluster, + HelmRelease, + HelmRepository, + Kustomization, + Manifest, +) + +__all__ = [ + "repo_root", + "build_manifest", +] + +_LOGGER = logging.getLogger(__name__) + +CLUSTER_KUSTOMIZE_NAME = "flux-system" +CLUSTER_KUSTOMIZE_KIND = "Kustomization" +KUSTOMIZE_KIND = "Kustomization" +HELM_REPO_KIND = "HelmRepository" +HELM_RELEASE_KIND = "HelmRelease" @cache def repo_root() -> Path: """Return the local github repo path.""" - git_repo = git.Repo(os.getcwd(), search_parent_directories=True) + git_repo = git.repo.Repo(os.getcwd(), search_parent_directories=True) return Path(git_repo.git.rev_parse("--show-toplevel")) -def manifest(manifest_file: Path) -> Manifest: - """Return the contents of the manifest file from the local repo.""" - contents = manifest_file.read_text() - doc = next(yaml.load_all(contents, Loader=yaml.Loader)) - if "spec" not in doc: - raise ValueError("Manifest file malformed, missing 'spec'") - return Manifest(clusters=doc["spec"]) +def domain_filter(version: str) -> Callable[[dict[str, Any]], bool]: + """Return a yaml doc filter for specified resource version.""" + + def func(doc: dict[str, Any]) -> bool: + if api_version := doc.get("apiVersion"): + if api_version.startswith(version): + return True + return False + + return func + + +CLUSTER_KUSTOMIZE_DOMAIN_FILTER = domain_filter(CLUSTER_KUSTOMIZE_DOMAIN) +KUSTOMIZE_DOMAIN_FILTER = domain_filter(KUSTOMIZE_DOMAIN) + + +async def get_clusters(path: Path) -> list[Cluster]: + """Load Cluster objects from the specified path.""" + cmd = kustomize.grep(f"kind={CLUSTER_KUSTOMIZE_KIND}", path).grep( + f"metadata.name={CLUSTER_KUSTOMIZE_NAME}" + ) + docs = await cmd.objects() + return [ + Cluster.from_doc(doc) for doc in docs if CLUSTER_KUSTOMIZE_DOMAIN_FILTER(doc) + ] + + +async def get_kustomizations(path: Path) -> list[Kustomization]: + """Load Kustomization objects from the specified path.""" + cmd = kustomize.grep(f"kind={KUSTOMIZE_KIND}", path).grep( + f"metadata.name={CLUSTER_KUSTOMIZE_NAME}", + invert=True, + ) + docs = await cmd.objects() + return [Kustomization.from_doc(doc) for doc in docs if KUSTOMIZE_DOMAIN_FILTER(doc)] + + +async def build_manifest(root: Path | None = None) -> Manifest: + """Build a Manifest object from the local cluster. + + This will locate all Kustomizations that represent clusters, then find all + the Kustomizations within that cluster, as well as all relevant Helm + resources. + """ + if root is None: + root = repo_root() + + clusters = await get_clusters(root) + for cluster in clusters: + _LOGGER.debug("Processing cluster: %s", cluster.path) + cluster.kustomizations = await get_kustomizations( + root / cluster.path.lstrip("./") + ) + for kustomization in cluster.kustomizations: + _LOGGER.debug("Processing kustomization: %s", kustomization.path) + cmd = kustomize.build(root / kustomization.path) + kustomization.helm_repos = [ + HelmRepository.from_doc(doc) + for doc in await cmd.grep(f"kind=^{HELM_REPO_KIND}$").objects() + ] + kustomization.helm_releases = [ + HelmRelease.from_doc(doc) + for doc in await cmd.grep(f"kind=^{HELM_RELEASE_KIND}$").objects() + ] + return Manifest(clusters=clusters) diff --git a/setup.cfg b/setup.cfg index 683dc1bb..cbbd5fb1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = flux-local -version = 0.0.4 +version = 0.0.5 description = flux-local is a python library and set of tools for managing a flux gitops repository, with validation steps to help improve quality of commits, PRs, and general local testing. long_description = file: README.md long_description_content_type = text/markdown diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 9f78a840..9fdc9023 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -5,13 +5,13 @@ import pytest import yaml +from pydantic import ValidationError from flux_local.manifest import ( Cluster, HelmRelease, HelmRepository, Manifest, - ManifestException, read_manifest, write_manifest, ) @@ -48,7 +48,7 @@ def test_parse_helm_repository() -> None: async def test_read_manifest_invalid_file() -> None: """Test reading an invalid manifest file.""" - with pytest.raises(ManifestException, match="Manifest file malformed"): + with pytest.raises(ValidationError, match="validation error for Manifest"): await read_manifest(Path("/dev/null"))