Skip to content

Commit

Permalink
Build manifest from local cluster objects (#23)
Browse files Browse the repository at this point in the history
Add a `builder` module that can build a manifest from local cluster
objects using `kustomize`.
  • Loading branch information
allenporter authored Feb 6, 2023
1 parent bb9b1eb commit 397bf8b
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 19 deletions.
28 changes: 23 additions & 5 deletions flux_local/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,29 @@
"ManifestException",
]

# Match a prefix of apiVersion to ensure we have the right type of object.
# We don't check specific versions for forward compatibility on upgrade.
CLUSTER_KUSTOMIZE_DOMAIN = "kustomize.toolkit.fluxcd.io"
KUSTOMIZE_DOMAIN = "kustomize.toolkit.fluxcd.io"
HELM_REPO_DOMAIN = "source.toolkit.fluxcd.io"
HELM_RELEASE_DOMAIN = "helm.toolkit.fluxcd.io"


def _check_version(doc: dict[str, Any], version: str) -> None:
"""Assert that the resource has the specified version."""
if not (api_version := doc.get("apiVersion")):
raise ValueError(f"Invalid object missing apiVersion: {doc}")
if not api_version.startswith(version):
raise ValueError(f"Invalid object expected '{version}': {doc}")


class HelmChart(BaseModel):
"""A representation of an instantiation of a chart for a HelmRelease."""

name: str
"""The name of the chart within the HelmRepository."""

version: str
version: Optional[str] = None
"""The version of the chart."""

repo_name: str
Expand All @@ -43,6 +58,7 @@ class HelmChart(BaseModel):
@classmethod
def from_doc(cls, doc: dict[str, Any]) -> "HelmChart":
"""Parse a HelmChart from a HelmRelease resource object."""
_check_version(doc, HELM_RELEASE_DOMAIN)
if not (spec := doc.get("spec")):
raise ValueError(f"Invalid {cls} missing spec: {doc}")
if not (chart := spec.get("chart")):
Expand Down Expand Up @@ -88,6 +104,7 @@ class HelmRelease(BaseModel):
@classmethod
def from_doc(cls, doc: dict[str, Any]) -> "HelmRelease":
"""Parse a HelmRelease from a kubernetes resource object."""
_check_version(doc, HELM_RELEASE_DOMAIN)
if not (metadata := doc.get("metadata")):
raise ValueError(f"Invalid {cls} missing metadata: {doc}")
if not (name := metadata.get("name")):
Expand Down Expand Up @@ -123,6 +140,7 @@ class HelmRepository(BaseModel):
@classmethod
def from_doc(cls, doc: dict[str, Any]) -> "HelmRepository":
"""Parse a HelmRepository from a kubernetes resource."""
_check_version(doc, HELM_REPO_DOMAIN)
if not (metadata := doc.get("metadata")):
raise ValueError(f"Invalid {cls} missing metadata: {doc}")
if not (name := metadata.get("name")):
Expand Down Expand Up @@ -164,6 +182,7 @@ class Kustomization(BaseModel):
@classmethod
def from_doc(cls, doc: dict[str, Any]) -> "Kustomization":
"""Parse a partial Kustomization from a kubernetes resource."""
_check_version(doc, KUSTOMIZE_DOMAIN)
if not (metadata := doc.get("metadata")):
raise ValueError(f"Invalid {cls} missing metadata: {doc}")
if not (name := metadata.get("name")):
Expand Down Expand Up @@ -199,6 +218,7 @@ class Cluster(BaseModel):
@classmethod
def from_doc(cls, doc: dict[str, Any]) -> "Cluster":
"""Parse a partial Kustomization from a kubernetes resource."""
_check_version(doc, CLUSTER_KUSTOMIZE_DOMAIN)
if not (metadata := doc.get("metadata")):
raise ValueError(f"Invalid {cls} missing metadata: {doc}")
if not (name := metadata.get("name")):
Expand All @@ -225,9 +245,7 @@ class Manifest(BaseModel):
def parse_yaml(content: str) -> "Manifest":
"""Parse a serialized manifest."""
doc = next(yaml.load_all(content, Loader=yaml.Loader), None)
if not doc or "spec" not in doc:
raise ManifestException("Manifest file malformed, missing 'spec'")
return Manifest(clusters=doc["spec"])
return Manifest.parse_obj(doc)

def yaml(self) -> str:
"""Serialize the manifest as a yaml file.
Expand Down Expand Up @@ -258,7 +276,7 @@ def yaml(self) -> str:
)
return cast(
str,
yaml.dump({"spec": data["clusters"]}, sort_keys=False, explicit_start=True),
yaml.dump(data, sort_keys=False, explicit_start=True),
)


Expand Down
121 changes: 110 additions & 11 deletions flux_local/repo.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,125 @@
"""Library for operating on a local repo."""
"""Library for operating on a local repo and building Manifests.
This will build a `manifest.Manifest` from a cluster repo. This follows the
pattern of building kustomizations, then reading helm releases (though it will
not evaluate the templates). The resulting `Manifest` contains all the necessary
information to do basic checks on objects in the cluster (e.g. run templates
from unit tests).
Example usage:
```
from flux_local import repo
manifest = await repo.build_manifest()
for cluster in manifest:
print(f"Found cluster: {cluster.path}")
for kustomization in cluster.kustomizations:
print(f"Found kustomization: {kustomization.path}")
"""

import logging
import os
from collections.abc import Callable
from functools import cache
from pathlib import Path
from typing import Any

import git
import yaml

from .manifest import Manifest
from . import kustomize
from .manifest import (
CLUSTER_KUSTOMIZE_DOMAIN,
KUSTOMIZE_DOMAIN,
Cluster,
HelmRelease,
HelmRepository,
Kustomization,
Manifest,
)

__all__ = [
"repo_root",
"build_manifest",
]

_LOGGER = logging.getLogger(__name__)

CLUSTER_KUSTOMIZE_NAME = "flux-system"
CLUSTER_KUSTOMIZE_KIND = "Kustomization"
KUSTOMIZE_KIND = "Kustomization"
HELM_REPO_KIND = "HelmRepository"
HELM_RELEASE_KIND = "HelmRelease"


@cache
def repo_root() -> Path:
"""Return the local github repo path."""
git_repo = git.Repo(os.getcwd(), search_parent_directories=True)
git_repo = git.repo.Repo(os.getcwd(), search_parent_directories=True)
return Path(git_repo.git.rev_parse("--show-toplevel"))


def manifest(manifest_file: Path) -> Manifest:
"""Return the contents of the manifest file from the local repo."""
contents = manifest_file.read_text()
doc = next(yaml.load_all(contents, Loader=yaml.Loader))
if "spec" not in doc:
raise ValueError("Manifest file malformed, missing 'spec'")
return Manifest(clusters=doc["spec"])
def domain_filter(version: str) -> Callable[[dict[str, Any]], bool]:
"""Return a yaml doc filter for specified resource version."""

def func(doc: dict[str, Any]) -> bool:
if api_version := doc.get("apiVersion"):
if api_version.startswith(version):
return True
return False

return func


CLUSTER_KUSTOMIZE_DOMAIN_FILTER = domain_filter(CLUSTER_KUSTOMIZE_DOMAIN)
KUSTOMIZE_DOMAIN_FILTER = domain_filter(KUSTOMIZE_DOMAIN)


async def get_clusters(path: Path) -> list[Cluster]:
"""Load Cluster objects from the specified path."""
cmd = kustomize.grep(f"kind={CLUSTER_KUSTOMIZE_KIND}", path).grep(
f"metadata.name={CLUSTER_KUSTOMIZE_NAME}"
)
docs = await cmd.objects()
return [
Cluster.from_doc(doc) for doc in docs if CLUSTER_KUSTOMIZE_DOMAIN_FILTER(doc)
]


async def get_kustomizations(path: Path) -> list[Kustomization]:
"""Load Kustomization objects from the specified path."""
cmd = kustomize.grep(f"kind={KUSTOMIZE_KIND}", path).grep(
f"metadata.name={CLUSTER_KUSTOMIZE_NAME}",
invert=True,
)
docs = await cmd.objects()
return [Kustomization.from_doc(doc) for doc in docs if KUSTOMIZE_DOMAIN_FILTER(doc)]


async def build_manifest(root: Path | None = None) -> Manifest:
"""Build a Manifest object from the local cluster.
This will locate all Kustomizations that represent clusters, then find all
the Kustomizations within that cluster, as well as all relevant Helm
resources.
"""
if root is None:
root = repo_root()

clusters = await get_clusters(root)
for cluster in clusters:
_LOGGER.debug("Processing cluster: %s", cluster.path)
cluster.kustomizations = await get_kustomizations(
root / cluster.path.lstrip("./")
)
for kustomization in cluster.kustomizations:
_LOGGER.debug("Processing kustomization: %s", kustomization.path)
cmd = kustomize.build(root / kustomization.path)
kustomization.helm_repos = [
HelmRepository.from_doc(doc)
for doc in await cmd.grep(f"kind=^{HELM_REPO_KIND}$").objects()
]
kustomization.helm_releases = [
HelmRelease.from_doc(doc)
for doc in await cmd.grep(f"kind=^{HELM_RELEASE_KIND}$").objects()
]
return Manifest(clusters=clusters)
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = flux-local
version = 0.0.4
version = 0.0.5
description = flux-local is a python library and set of tools for managing a flux gitops repository, with validation steps to help improve quality of commits, PRs, and general local testing.
long_description = file: README.md
long_description_content_type = text/markdown
Expand Down
4 changes: 2 additions & 2 deletions tests/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

import pytest
import yaml
from pydantic import ValidationError

from flux_local.manifest import (
Cluster,
HelmRelease,
HelmRepository,
Manifest,
ManifestException,
read_manifest,
write_manifest,
)
Expand Down Expand Up @@ -48,7 +48,7 @@ def test_parse_helm_repository() -> None:

async def test_read_manifest_invalid_file() -> None:
"""Test reading an invalid manifest file."""
with pytest.raises(ManifestException, match="Manifest file malformed"):
with pytest.raises(ValidationError, match="validation error for Manifest"):
await read_manifest(Path("/dev/null"))


Expand Down

0 comments on commit 397bf8b

Please sign in to comment.