From e339d39104a2e03f49749f430f55578cba1a4e95 Mon Sep 17 00:00:00 2001 From: liamfallon Date: Tue, 3 Dec 2024 09:41:03 +0000 Subject: [PATCH] Refactor task package to make mutations more visible --- pkg/task/clone.go | 2 + pkg/task/eval.go | 2 + pkg/task/generictaskhandler.go | 165 +-------------------------------- pkg/task/replace_test.go | 4 +- pkg/task/replaceresources.go | 87 +++++++++++++++++ pkg/task/update.go | 129 ++++++++++++++++++++++++++ 6 files changed, 223 insertions(+), 166 deletions(-) create mode 100644 pkg/task/replaceresources.go create mode 100644 pkg/task/update.go diff --git a/pkg/task/clone.go b/pkg/task/clone.go index f5e2ee06..5e9bbf28 100644 --- a/pkg/task/clone.go +++ b/pkg/task/clone.go @@ -31,6 +31,8 @@ import ( "k8s.io/klog/v2" ) +var _ mutation = &clonePackageMutation{} + type clonePackageMutation struct { task *api.Task diff --git a/pkg/task/eval.go b/pkg/task/eval.go index 982322e3..8aa657ac 100644 --- a/pkg/task/eval.go +++ b/pkg/task/eval.go @@ -29,6 +29,8 @@ import ( "sigs.k8s.io/kustomize/kyaml/yaml" ) +var _ mutation = &evalFunctionMutation{} + type evalFunctionMutation struct { runtime fn.FunctionRuntime runnerOptions fnruntime.RunnerOptions diff --git a/pkg/task/generictaskhandler.go b/pkg/task/generictaskhandler.go index 27b32b31..dd44655b 100644 --- a/pkg/task/generictaskhandler.go +++ b/pkg/task/generictaskhandler.go @@ -23,7 +23,6 @@ import ( configapi "github.com/nephio-project/porch/api/porchconfig/v1alpha1" "github.com/nephio-project/porch/internal/kpt/builtins" "github.com/nephio-project/porch/internal/kpt/fnruntime" - "github.com/nephio-project/porch/pkg/kpt" kptfile "github.com/nephio-project/porch/pkg/kpt/api/kptfile/v1" "github.com/nephio-project/porch/pkg/kpt/fn" "github.com/nephio-project/porch/pkg/repository" @@ -205,7 +204,7 @@ func (th *genericTaskHandler) DoPRResourceMutations(ctx context.Context, pr2Upda runnerOptions := th.runnerOptionsResolver(oldRes.GetNamespace()) mutations := []mutation{ - &mutationReplaceResources{ + &replaceResourcesMutation{ newResources: newRes, oldResources: oldRes, }, @@ -485,168 +484,6 @@ func applyResourceMutations(ctx context.Context, draft repository.PackageRevisio return applied, renderStatus, nil } -type updatePackageMutation struct { - cloneTask *api.Task - updateTask *api.Task - repoOpener repository.RepositoryOpener - referenceResolver repository.ReferenceResolver - namespace string - pkgName string -} - -func (m *updatePackageMutation) apply(ctx context.Context, resources repository.PackageResources) (repository.PackageResources, *api.TaskResult, error) { - ctx, span := tracer.Start(ctx, "updatePackageMutation::Apply", trace.WithAttributes()) - defer span.End() - - currUpstreamPkgRef, err := m.currUpstream() - if err != nil { - return repository.PackageResources{}, nil, err - } - - targetUpstream := m.updateTask.Update.Upstream - if targetUpstream.Type == api.RepositoryTypeGit || targetUpstream.Type == api.RepositoryTypeOCI { - return repository.PackageResources{}, nil, fmt.Errorf("update is not supported for non-porch upstream packages") - } - - originalResources, err := (&repository.PackageFetcher{ - RepoOpener: m.repoOpener, - ReferenceResolver: m.referenceResolver, - }).FetchResources(ctx, currUpstreamPkgRef, m.namespace) - if err != nil { - return repository.PackageResources{}, nil, fmt.Errorf("error fetching the resources for package %s with ref %+v", - m.pkgName, *currUpstreamPkgRef) - } - - upstreamRevision, err := (&repository.PackageFetcher{ - RepoOpener: m.repoOpener, - ReferenceResolver: m.referenceResolver, - }).FetchRevision(ctx, targetUpstream.UpstreamRef, m.namespace) - if err != nil { - return repository.PackageResources{}, nil, fmt.Errorf("error fetching revision for target upstream %s", targetUpstream.UpstreamRef.Name) - } - upstreamResources, err := upstreamRevision.GetResources(ctx) - if err != nil { - return repository.PackageResources{}, nil, fmt.Errorf("error fetching resources for target upstream %s", targetUpstream.UpstreamRef.Name) - } - - klog.Infof("performing pkg upgrade operation for pkg %s resource counts local[%d] original[%d] upstream[%d]", - m.pkgName, len(resources.Contents), len(originalResources.Spec.Resources), len(upstreamResources.Spec.Resources)) - - // May be have packageUpdater part of the Porch core to make it easy for testing ? - updatedResources, err := (&repository.DefaultPackageUpdater{}).Update(ctx, - resources, - repository.PackageResources{ - Contents: originalResources.Spec.Resources, - }, - repository.PackageResources{ - Contents: upstreamResources.Spec.Resources, - }) - if err != nil { - return repository.PackageResources{}, nil, fmt.Errorf("error updating the package to revision %s", targetUpstream.UpstreamRef.Name) - } - - newUpstream, newUpstreamLock, err := upstreamRevision.GetLock() - if err != nil { - return repository.PackageResources{}, nil, fmt.Errorf("error fetching the resources for package revisions %s", targetUpstream.UpstreamRef.Name) - } - if err := kpt.UpdateKptfileUpstream("", updatedResources.Contents, newUpstream, newUpstreamLock); err != nil { - return repository.PackageResources{}, nil, fmt.Errorf("failed to apply upstream lock to package %q: %w", m.pkgName, err) - } - - // ensure merge-key comment is added to newly added resources. - result, err := ensureMergeKey(ctx, updatedResources) - if err != nil { - klog.Infof("failed to add merge key comments: %v", err) - } - return result, &api.TaskResult{Task: m.updateTask}, nil -} - -// Currently assumption is that downstream packages will be forked from a porch package. -// As per current implementation, upstream package ref is stored in a new update task but this may -// change so the logic of figuring out current upstream will live in this function. -func (m *updatePackageMutation) currUpstream() (*api.PackageRevisionRef, error) { - if m.cloneTask == nil || m.cloneTask.Clone == nil { - return nil, fmt.Errorf("package %s does not have original upstream info", m.pkgName) - } - upstream := m.cloneTask.Clone.Upstream - if upstream.Type == api.RepositoryTypeGit || upstream.Type == api.RepositoryTypeOCI { - return nil, fmt.Errorf("upstream package must be porch native package. Found it to be %s", upstream.Type) - } - return upstream.UpstreamRef, nil -} - -func findCloneTask(pr *api.PackageRevision) *api.Task { - if len(pr.Spec.Tasks) == 0 { - return nil - } - firstTask := pr.Spec.Tasks[0] - if firstTask.Type == api.TaskTypeClone { - return &firstTask - } - return nil -} - -type mutationReplaceResources struct { - newResources *api.PackageRevisionResources - oldResources *api.PackageRevisionResources -} - -func (m *mutationReplaceResources) apply(ctx context.Context, resources repository.PackageResources) (repository.PackageResources, *api.TaskResult, error) { - _, span := tracer.Start(ctx, "mutationReplaceResources::Apply", trace.WithAttributes()) - defer span.End() - - patch := &api.PackagePatchTaskSpec{} - - old := resources.Contents - new, err := healConfig(old, m.newResources.Spec.Resources) - if err != nil { - return repository.PackageResources{}, nil, fmt.Errorf("failed to heal resources: %w", err) - } - - for k, newV := range new { - oldV, ok := old[k] - // New config or changed config - if !ok { - patchSpec := api.PatchSpec{ - File: k, - PatchType: api.PatchTypeCreateFile, - Contents: newV, - } - patch.Patches = append(patch.Patches, patchSpec) - } else if newV != oldV { - patchSpec, err := GeneratePatch(k, oldV, newV) - if err != nil { - return repository.PackageResources{}, nil, fmt.Errorf("error generating patch: %w", err) - } - if patchSpec.Contents == "" { - continue - } - patch.Patches = append(patch.Patches, patchSpec) - } - } - for k := range old { - // Deleted config - if _, ok := new[k]; !ok { - patchSpec := api.PatchSpec{ - File: k, - PatchType: api.PatchTypeDeleteFile, - } - patch.Patches = append(patch.Patches, patchSpec) - } - } - // If patch is empty, don't create a Task. - var taskResult *api.TaskResult - if len(patch.Patches) > 0 { - taskResult = &api.TaskResult{ - Task: &api.Task{ - Type: api.TaskTypePatch, - Patch: patch, - }, - } - } - return repository.PackageResources{Contents: new}, taskResult, nil -} - func healConfig(old, new map[string]string) (map[string]string, error) { // Copy comments from old config to new oldResources, err := (&packageReader{ diff --git a/pkg/task/replace_test.go b/pkg/task/replace_test.go index 062c8805..3acd2d31 100644 --- a/pkg/task/replace_test.go +++ b/pkg/task/replace_test.go @@ -33,7 +33,7 @@ func TestReplaceResources(t *testing.T) { input := repository.ReadPackage(t, filepath.Join("testdata", "replace")) nocomment := removeComments(t, input) - replace := &mutationReplaceResources{ + replace := &replaceResourcesMutation{ newResources: &v1alpha1.PackageRevisionResources{ Spec: v1alpha1.PackageRevisionResourcesSpec{ Resources: nocomment.Contents, @@ -48,7 +48,7 @@ func TestReplaceResources(t *testing.T) { output, _, err := replace.apply(ctx, input) if err != nil { - t.Fatalf("mutationReplaceResources.Apply failed: %v", err) + t.Fatalf("replaceResourcesMutation.Apply failed: %v", err) } if !cmp.Equal(input, output) { diff --git a/pkg/task/replaceresources.go b/pkg/task/replaceresources.go new file mode 100644 index 00000000..d3490786 --- /dev/null +++ b/pkg/task/replaceresources.go @@ -0,0 +1,87 @@ +// Copyright 2024 The kpt and Nephio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "fmt" + + api "github.com/nephio-project/porch/api/porch/v1alpha1" + "github.com/nephio-project/porch/pkg/repository" + "go.opentelemetry.io/otel/trace" +) + +var _ mutation = &replaceResourcesMutation{} + +type replaceResourcesMutation struct { + newResources *api.PackageRevisionResources + oldResources *api.PackageRevisionResources +} + +func (m *replaceResourcesMutation) apply(ctx context.Context, resources repository.PackageResources) (repository.PackageResources, *api.TaskResult, error) { + _, span := tracer.Start(ctx, "mutationReplaceResources::Apply", trace.WithAttributes()) + defer span.End() + + patch := &api.PackagePatchTaskSpec{} + + old := resources.Contents + new, err := healConfig(old, m.newResources.Spec.Resources) + if err != nil { + return repository.PackageResources{}, nil, fmt.Errorf("failed to heal resources: %w", err) + } + + for k, newV := range new { + oldV, ok := old[k] + // New config or changed config + if !ok { + patchSpec := api.PatchSpec{ + File: k, + PatchType: api.PatchTypeCreateFile, + Contents: newV, + } + patch.Patches = append(patch.Patches, patchSpec) + } else if newV != oldV { + patchSpec, err := GeneratePatch(k, oldV, newV) + if err != nil { + return repository.PackageResources{}, nil, fmt.Errorf("error generating patch: %w", err) + } + if patchSpec.Contents == "" { + continue + } + patch.Patches = append(patch.Patches, patchSpec) + } + } + for k := range old { + // Deleted config + if _, ok := new[k]; !ok { + patchSpec := api.PatchSpec{ + File: k, + PatchType: api.PatchTypeDeleteFile, + } + patch.Patches = append(patch.Patches, patchSpec) + } + } + // If patch is empty, don't create a Task. + var taskResult *api.TaskResult + if len(patch.Patches) > 0 { + taskResult = &api.TaskResult{ + Task: &api.Task{ + Type: api.TaskTypePatch, + Patch: patch, + }, + } + } + return repository.PackageResources{Contents: new}, taskResult, nil +} diff --git a/pkg/task/update.go b/pkg/task/update.go new file mode 100644 index 00000000..d0a3fea7 --- /dev/null +++ b/pkg/task/update.go @@ -0,0 +1,129 @@ +// Copyright 2022, 2024 The kpt and Nephio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "fmt" + + api "github.com/nephio-project/porch/api/porch/v1alpha1" + "github.com/nephio-project/porch/pkg/kpt" + "github.com/nephio-project/porch/pkg/repository" + "go.opentelemetry.io/otel/trace" + "k8s.io/klog/v2" +) + +var _ mutation = &updatePackageMutation{} + +type updatePackageMutation struct { + cloneTask *api.Task + updateTask *api.Task + repoOpener repository.RepositoryOpener + referenceResolver repository.ReferenceResolver + namespace string + pkgName string +} + +func (m *updatePackageMutation) apply(ctx context.Context, resources repository.PackageResources) (repository.PackageResources, *api.TaskResult, error) { + ctx, span := tracer.Start(ctx, "updatePackageMutation::Apply", trace.WithAttributes()) + defer span.End() + + currUpstreamPkgRef, err := m.currUpstream() + if err != nil { + return repository.PackageResources{}, nil, err + } + + targetUpstream := m.updateTask.Update.Upstream + if targetUpstream.Type == api.RepositoryTypeGit || targetUpstream.Type == api.RepositoryTypeOCI { + return repository.PackageResources{}, nil, fmt.Errorf("update is not supported for non-porch upstream packages") + } + + originalResources, err := (&repository.PackageFetcher{ + RepoOpener: m.repoOpener, + ReferenceResolver: m.referenceResolver, + }).FetchResources(ctx, currUpstreamPkgRef, m.namespace) + if err != nil { + return repository.PackageResources{}, nil, fmt.Errorf("error fetching the resources for package %s with ref %+v", + m.pkgName, *currUpstreamPkgRef) + } + + upstreamRevision, err := (&repository.PackageFetcher{ + RepoOpener: m.repoOpener, + ReferenceResolver: m.referenceResolver, + }).FetchRevision(ctx, targetUpstream.UpstreamRef, m.namespace) + if err != nil { + return repository.PackageResources{}, nil, fmt.Errorf("error fetching revision for target upstream %s", targetUpstream.UpstreamRef.Name) + } + upstreamResources, err := upstreamRevision.GetResources(ctx) + if err != nil { + return repository.PackageResources{}, nil, fmt.Errorf("error fetching resources for target upstream %s", targetUpstream.UpstreamRef.Name) + } + + klog.Infof("performing pkg upgrade operation for pkg %s resource counts local[%d] original[%d] upstream[%d]", + m.pkgName, len(resources.Contents), len(originalResources.Spec.Resources), len(upstreamResources.Spec.Resources)) + + // May be have packageUpdater part of the Porch core to make it easy for testing ? + updatedResources, err := (&repository.DefaultPackageUpdater{}).Update(ctx, + resources, + repository.PackageResources{ + Contents: originalResources.Spec.Resources, + }, + repository.PackageResources{ + Contents: upstreamResources.Spec.Resources, + }) + if err != nil { + return repository.PackageResources{}, nil, fmt.Errorf("error updating the package to revision %s", targetUpstream.UpstreamRef.Name) + } + + newUpstream, newUpstreamLock, err := upstreamRevision.GetLock() + if err != nil { + return repository.PackageResources{}, nil, fmt.Errorf("error fetching the resources for package revisions %s", targetUpstream.UpstreamRef.Name) + } + if err := kpt.UpdateKptfileUpstream("", updatedResources.Contents, newUpstream, newUpstreamLock); err != nil { + return repository.PackageResources{}, nil, fmt.Errorf("failed to apply upstream lock to package %q: %w", m.pkgName, err) + } + + // ensure merge-key comment is added to newly added resources. + result, err := ensureMergeKey(ctx, updatedResources) + if err != nil { + klog.Infof("failed to add merge key comments: %v", err) + } + return result, &api.TaskResult{Task: m.updateTask}, nil +} + +// Currently assumption is that downstream packages will be forked from a porch package. +// As per current implementation, upstream package ref is stored in a new update task but this may +// change so the logic of figuring out current upstream will live in this function. +func (m *updatePackageMutation) currUpstream() (*api.PackageRevisionRef, error) { + if m.cloneTask == nil || m.cloneTask.Clone == nil { + return nil, fmt.Errorf("package %s does not have original upstream info", m.pkgName) + } + upstream := m.cloneTask.Clone.Upstream + if upstream.Type == api.RepositoryTypeGit || upstream.Type == api.RepositoryTypeOCI { + return nil, fmt.Errorf("upstream package must be porch native package. Found it to be %s", upstream.Type) + } + return upstream.UpstreamRef, nil +} + +func findCloneTask(pr *api.PackageRevision) *api.Task { + if len(pr.Spec.Tasks) == 0 { + return nil + } + firstTask := pr.Spec.Tasks[0] + if firstTask.Type == api.TaskTypeClone { + return &firstTask + } + return nil +}