Skip to content

Commit

Permalink
Remove complexity from multi-cluster diff
Browse files Browse the repository at this point in the history
  • Loading branch information
allenporter committed Dec 21, 2023
1 parent f5efe17 commit 8e72c0e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 117 deletions.
5 changes: 0 additions & 5 deletions flux_local/git_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ class ResourceVisitor:

func: Callable[
[
Path,
Path,
Kustomization | HelmRelease | HelmRepository | ClusterPolicy,
kustomize.Kustomize | None,
Expand Down Expand Up @@ -546,7 +545,6 @@ async def build_kustomization(

if kustomization_selector.visitor:
await kustomization_selector.visitor.func(
cluster_path,
Path(kustomization.path),
kustomization,
cmd,
Expand Down Expand Up @@ -675,7 +673,6 @@ async def update_kustomization(cluster: Cluster) -> None:
for kustomization in cluster.kustomizations:
for helm_repo in kustomization.helm_repos:
await selector.helm_repo.visitor.func(
Path(cluster.path),
Path(kustomization.path),
helm_repo,
None,
Expand All @@ -685,7 +682,6 @@ async def update_kustomization(cluster: Cluster) -> None:
for kustomization in cluster.kustomizations:
for helm_release in kustomization.helm_releases:
await selector.helm_release.visitor.func(
Path(cluster.path),
Path(kustomization.path),
helm_release,
None,
Expand All @@ -695,7 +691,6 @@ async def update_kustomization(cluster: Cluster) -> None:
for kustomization in cluster.kustomizations:
for cluster_policy in kustomization.cluster_policies:
await selector.cluster_policy.visitor.func(
Path(cluster.path),
Path(kustomization.path),
cluster_policy,
None,
Expand Down
46 changes: 17 additions & 29 deletions flux_local/tool/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,19 @@ def perform_yaml_diff(

def get_helm_release_diff_keys(
a: ObjectOutput, b: ObjectOutput
) -> dict[str, list[ResourceKey]]:
) -> list[ResourceKey]:
"""Return HelmRelease resource keys with diffs, by cluster."""
result: dict[str, list[ResourceKey]] = {}
results: list[ResourceKey] = []
for kustomization_key in _unique_keys(a.content, b.content):
cluster_path = kustomization_key.cluster_path
_LOGGER.debug("Diffing results for Kustomization %s", kustomization_key)
a_resources = a.content.get(kustomization_key, {})
b_resources = b.content.get(kustomization_key, {})
for resource_key in _unique_keys(a_resources, b_resources):
if resource_key.kind != "HelmRelease":
continue
if a_resources.get(resource_key) != b_resources.get(resource_key):
result[cluster_path] = result.get(cluster_path, []) + [resource_key]
return result
results.append(resource_key)
return results


def add_diff_flags(args: ArgumentParser) -> None:
Expand Down Expand Up @@ -370,31 +369,20 @@ async def run( # type: ignore[no-untyped-def]
# This avoid building unnecessary resources and churn from things like
# random secret generation.
diff_resource_keys = get_helm_release_diff_keys(orig_content, content)
cluster_paths = {
kustomization_key.cluster_path
for kustomization_key in set(orig_content.content.keys())
| set(content.content.keys())
diff_names = {
f"{resource_key.namespace}/{resource_key.name}"
for resource_key in diff_resource_keys
}
for cluster_path in cluster_paths:
diff_keys = diff_resource_keys.get(cluster_path, [])
diff_names = {
f"{resource_key.namespace}/{resource_key.name}"
for resource_key in diff_keys
}
if cluster_path in helm_visitor.releases:
releases = [
release
for release in helm_visitor.releases[cluster_path]
if f"{release.namespace}/{release.name}" in diff_names
]
helm_visitor.releases[cluster_path] = releases
if cluster_path in orig_helm_visitor.releases:
releases = [
release
for release in orig_helm_visitor.releases[cluster_path]
if f"{release.namespace}/{release.name}" in diff_names
]
orig_helm_visitor.releases[cluster_path] = releases
helm_visitor.releases = [
release
for release in helm_visitor.releases
if release.namespaced_name in diff_names
]
orig_helm_visitor.releases = [
release
for release in orig_helm_visitor.releases
if release.namespaced_name in diff_names
]

helm_content = ObjectOutput(strip_attrs)
orig_helm_content = ObjectOutput(strip_attrs)
Expand Down
74 changes: 15 additions & 59 deletions flux_local/tool/visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,12 @@
class ResourceKey:
"""Key for a Kustomization object output."""

cluster_path: str
kustomization_path: str
kind: str
namespace: str
name: str

def __post_init__(self) -> None:
if self.cluster_path.startswith("/"):
raise AssertionError(
f"Expected cluster_path as relative: {self.cluster_path}"
)
if self.kustomization_path.startswith("/"):
raise AssertionError(
f"Expected kustomization_path as relative: {self.kustomization_path}"
Expand All @@ -63,9 +58,6 @@ def label(self) -> str:
if self.kustomization_path and self.kustomization_path != ".":
parts.append(self.kustomization_path)
parts.append(" ")
elif self.cluster_path:
parts.append(self.cluster_path)
parts.append(" ")
parts.append(self.compact_label)
return "".join(parts)

Expand Down Expand Up @@ -95,7 +87,6 @@ def visitor(self) -> git_repo.ResourceVisitor:
@abstractmethod
async def call_async(
self,
cluster_path: pathlib.Path,
kustomization_path: pathlib.Path,
doc: ResourceType,
cmd: Kustomize | None,
Expand All @@ -104,12 +95,10 @@ async def call_async(

def key_func(
self,
cluster_path: pathlib.Path,
kustomization_path: pathlib.Path,
resource: ResourceType,
) -> ResourceKey:
return ResourceKey(
cluster_path=str(cluster_path),
kustomization_path=str(kustomization_path),
kind=resource.__class__.__name__,
namespace=resource.namespace or "",
Expand All @@ -126,7 +115,6 @@ def __init__(self) -> None:

async def call_async(
self,
cluster_path: pathlib.Path,
kustomization_path: pathlib.Path,
doc: ResourceType,
cmd: Kustomize | None,
Expand All @@ -137,7 +125,7 @@ async def call_async(
lines = content.split("\n")
if lines[0] != "---":
lines.insert(0, "---")
self.content[self.key_func(cluster_path, kustomization_path, doc)] = lines
self.content[self.key_func(kustomization_path, doc)] = lines


def strip_attrs(metadata: dict[str, Any], strip_attributes: list[str]) -> None:
Expand All @@ -159,14 +147,11 @@ class ImageOutput(ResourceOutput):

def __init__(self) -> None:
"""Initialize ObjectOutput."""
# Map of kustomizations to the map of built objects as lines of the yaml string
self.content: dict[ResourceKey, dict[ResourceKey, list[str]]] = {}
self.image_visitor = image.ImageVisitor()
self.repo_visitor = self.image_visitor.repo_visitor()

async def call_async(
self,
cluster_path: pathlib.Path,
kustomization_path: pathlib.Path,
doc: ResourceType,
cmd: Kustomize | None,
Expand Down Expand Up @@ -203,7 +188,6 @@ def __init__(self, strip_attributes: list[str] | None) -> None:

async def call_async(
self,
cluster_path: pathlib.Path,
kustomization_path: pathlib.Path,
doc: ResourceType,
cmd: Kustomize | None,
Expand Down Expand Up @@ -232,7 +216,6 @@ async def call_async(
strip_attrs(meta, self.strip_attributes)
resource_key = ResourceKey(
kind=kind,
cluster_path=str(cluster_path),
kustomization_path=str(kustomization_path),
namespace=metadata.get("namespace", doc.namespace),
name=metadata.get("name", ""),
Expand All @@ -242,73 +225,67 @@ async def call_async(
lines.insert(0, "---")
contents[resource_key] = lines
self.content[
self.key_func(cluster_path, kustomization_path, doc)
self.key_func(kustomization_path, doc)
] = contents


async def inflate_release(
cluster_path: pathlib.Path,
helm: Helm,
release: HelmRelease,
visitor: git_repo.ResourceVisitor,
options: Options,
) -> None:
cmd = await helm.template(release, options)
# We can ignore the Kustomiation path since we're essentially grouping by cluster
await visitor.func(cluster_path, pathlib.Path(""), release, cmd)
await visitor.func(pathlib.Path(""), release, cmd)


class HelmVisitor:
"""Helper that visits Helm related objects and handles inflation."""

def __init__(self) -> None:
"""Initialize KustomizationContentOutput."""
self.repos: dict[str, list[HelmRepository]] = {}
self.releases: dict[str, list[HelmRelease]] = {}
self.repos: list[HelmRepository] = []
self.releases: list[HelmRelease] = []

def active_repos(self, cluster_path: str) -> list[HelmRepository]:
@property
def active_repos(self) -> list[HelmRepository]:
"""Return HelpRepositories referenced by a HelmRelease."""
repo_keys: set[str] = {
f"{release.chart.repo_namespace}-{release.chart.repo_name}"
for release in self.releases.get(cluster_path, [])
for release in self.releases
}
return [
repo
for repo in self.repos.get(cluster_path, [])
for repo in self.repos
if repo.repo_name in repo_keys
]

def repo_visitor(self) -> git_repo.ResourceVisitor:
"""Return a git_repo.ResourceVisitor that points to this object."""

async def add_repo(
cluster_path: pathlib.Path,
kustomization_path: pathlib.Path,
doc: ResourceType,
cmd: Kustomize | None,
) -> None:
if not isinstance(doc, HelmRepository):
raise ValueError(f"Expected HelmRepository: {doc}")
self.repos[str(cluster_path)] = self.repos.get(str(cluster_path), []) + [
doc
]
self.repos.append(doc)

return git_repo.ResourceVisitor(func=add_repo)

def release_visitor(self) -> git_repo.ResourceVisitor:
"""Return a git_repo.ResourceVisitor that points to this object."""

async def add_release(
cluster_path: pathlib.Path,
kustomization_path: pathlib.Path,
doc: ResourceType,
cmd: Kustomize | None,
) -> None:
if not isinstance(doc, HelmRelease):
raise ValueError(f"Expected HelmRelease: {doc}")
self.releases[str(cluster_path)] = self.releases.get(
str(cluster_path), []
) + [doc]
self.releases.append(doc)

return git_repo.ResourceVisitor(func=add_release)

Expand All @@ -319,43 +296,22 @@ async def inflate(
options: Options,
) -> None:
"""Expand and notify about HelmReleases discovered."""
cluster_paths = set(list(self.releases)) | set(list(self.repos))
tasks = [
self.inflate_cluster(
helm_cache_dir,
pathlib.Path(cluster_path),
visitor,
options,
)
for cluster_path in cluster_paths
]
_LOGGER.debug("Waiting for cluster inflation to complete")
await asyncio.gather(*tasks)

async def inflate_cluster(
self,
helm_cache_dir: pathlib.Path,
cluster_path: pathlib.Path,
visitor: git_repo.ResourceVisitor,
options: Options,
) -> None:
_LOGGER.debug("Inflating Helm charts in cluster %s", cluster_path)
_LOGGER.debug("Inflating Helm charts in cluster")
if not self.releases:
return
with tempfile.TemporaryDirectory() as tmp_dir:
helm = Helm(pathlib.Path(tmp_dir), helm_cache_dir)
if active_repos := self.active_repos(str(cluster_path)):
if active_repos := self.active_repos:
helm.add_repos(active_repos)
await helm.update()
tasks = [
inflate_release(
cluster_path,
helm,
release,
visitor,
options,
)
for release in self.releases.get(str(cluster_path), [])
for release in self.releases
]
_LOGGER.debug("Waiting for tasks to inflate %s", cluster_path)
_LOGGER.debug("Waiting for inflate tasks to complete")
await asyncio.gather(*tasks)
Loading

0 comments on commit 8e72c0e

Please sign in to comment.