From 11b79bb6636ee627fca3cf044de6c4f582990252 Mon Sep 17 00:00:00 2001 From: Srikumar Venugopal Date: Tue, 21 May 2024 18:47:31 +0100 Subject: [PATCH] Fix repeating mount entries (#361) --- .../admissioncontroller/mutatingwebhook.go | 23 +++-- .../mutatingwebhook_test.go | 89 ++++++++++++++++--- .../controllers/datasetinternal_controller.go | 3 +- src/dataset-operator/go.mod | 3 +- src/dataset-operator/go.sum | 1 + src/dataset-operator/testing/wrapper.go | 64 +++++++++++++ 6 files changed, 164 insertions(+), 19 deletions(-) diff --git a/src/dataset-operator/admissioncontroller/mutatingwebhook.go b/src/dataset-operator/admissioncontroller/mutatingwebhook.go index 98f0456e..33d6901f 100644 --- a/src/dataset-operator/admissioncontroller/mutatingwebhook.go +++ b/src/dataset-operator/admissioncontroller/mutatingwebhook.go @@ -12,11 +12,12 @@ import ( "strings" datasetsv1alpha1 "github.com/datashim-io/datashim/src/dataset-operator/api/v1alpha1" + "github.com/go-logr/logr" jsonpatch "gomodules.xyz/jsonpatch/v2" + admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -28,7 +29,7 @@ const ( ) var ( - log = ctrl.Log.WithName("datashim-webhook") + log = logf.Log.WithName("datashim-webhook") ) //following the kubebuilder example for the pod mutator @@ -41,10 +42,13 @@ type DatasetPodMutator struct { func (m *DatasetPodMutator) Handle(ctx context.Context, req admission.Request) admission.Response { // Mutate mutates - - log = logf.FromContext(ctx) + log = log.WithValues("admission pod request", req) log.V(1).Info("webhook received", "request", req) + if req.Operation != admissionv1.Create { + return admission.Allowed(fmt.Sprintf("No Pod mutation required for operation %v.", req.Operation)) + } + pod := &corev1.Pod{} err := m.Decoder.Decode(req, pod) @@ -134,6 +138,7 @@ func (d *DatasetInput) String() string { func DatasetInputFromPod(pod *corev1.Pod) (map[int]*DatasetInput, error) { // Format is {"id": {"index": , "useas": mount/configmap} + //log = log.WithName("dataset-label-processing") log.V(1).Info("Pod labels", "labels", pod.Labels) datasets := map[int]*DatasetInput{} @@ -243,6 +248,7 @@ func RetrieveDatasetsFromAPIServer(ctx context.Context, client client.Client, po } func PatchPodWithDatasetLabels(pod *corev1.Pod, datasets map[int]*DatasetInput) ([]jsonpatch.JsonPatchOperation, error) { + //log = log.WithName("pod-patcher") patchops := []jsonpatch.JsonPatchOperation{} if len(datasets) == 0 { @@ -304,7 +310,7 @@ func PatchPodWithDatasetLabels(pod *corev1.Pod, datasets map[int]*DatasetInput) if len(datasets_tomount) > 0 { log.V(1).Info("Patching volumes to Pod Spec", "datasets", datasets_tomount) - patch_ds := patchPodSpecWithDatasetPVCs(pod, datasets_tomount) + patch_ds := patchPodSpecWithDatasetPVCs(pod, datasets_tomount, log) patchops = append(patchops, patch_ds...) } @@ -321,7 +327,7 @@ func PatchPodWithDatasetLabels(pod *corev1.Pod, datasets map[int]*DatasetInput) return patchops, nil } -func patchPodSpecWithDatasetPVCs(pod *corev1.Pod, datasets map[int]*DatasetInput) (patches []jsonpatch.JsonPatchOperation) { +func patchPodSpecWithDatasetPVCs(pod *corev1.Pod, datasets map[int]*DatasetInput, log logr.Logger) (patches []jsonpatch.JsonPatchOperation) { patches = []jsonpatch.JsonPatchOperation{} vol_id := len(pod.Spec.Volumes) @@ -401,8 +407,9 @@ func patchContainersWithDatasetVolumes(pod *corev1.Pod, datasets map[int]*Datase mount_idx := len(mounts) for o := range order { - exists, _ := in_array(datasets[o], mount_names) + exists, _ := in_array(datasets[o].name, mount_names) if !exists { + log.V(1).Info("Dataset is not already mounted", "dataset", datasets[o], "pod", pod.Name) patch := jsonpatch.JsonPatchOperation{ Operation: "add", Path: "/spec/" + container_typ + "/" + fmt.Sprint(container_idx) + "/volumeMounts/" + fmt.Sprint(mount_idx), @@ -413,6 +420,8 @@ func patchContainersWithDatasetVolumes(pod *corev1.Pod, datasets map[int]*Datase } patchOps = append(patchOps, patch) mount_idx += 1 + } else { + log.V(1).Info("Dataset is already mounted", "dataset", datasets[o], "pod", pod.Name) } } } diff --git a/src/dataset-operator/admissioncontroller/mutatingwebhook_test.go b/src/dataset-operator/admissioncontroller/mutatingwebhook_test.go index cb4f106a..f262eeb3 100644 --- a/src/dataset-operator/admissioncontroller/mutatingwebhook_test.go +++ b/src/dataset-operator/admissioncontroller/mutatingwebhook_test.go @@ -1,6 +1,9 @@ package admissioncontroller import ( + "context" + "encoding/json" + "fmt" "os" "path/filepath" @@ -8,16 +11,22 @@ import ( testing "github.com/datashim-io/datashim/src/dataset-operator/testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/zap/zapcore" jsonpatch "gomodules.xyz/jsonpatch/v2" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) +var cfg *rest.Config +var k8sClient client.Client var testEnv *envtest.Environment type testPodLabels struct { @@ -26,7 +35,7 @@ type testPodLabels struct { } var _ = BeforeSuite(func() { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.Level(zapcore.DebugLevel), zap.UseDevMode(true))) By("bootstrapping test environment") use_existing_cluster := true @@ -52,7 +61,7 @@ var _ = BeforeSuite(func() { //+kubebuilder:scaffold:scheme - k8sClient, err := client.New(cfg, client.Options{Scheme: scheme.Scheme}) + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) @@ -62,6 +71,7 @@ var _ = DescribeTable("Pod is mutated correctly", pod := tc.makeInputPodSpec() datasets, err := DatasetInputFromPod(pod) + Expect(err).ShouldNot(HaveOccurred()) Expect(PatchPodWithDatasetLabels(pod, datasets)). @@ -227,6 +237,27 @@ var _ = DescribeTable("Pod is mutated correctly", return patchArray }, }), + Entry("Pod with 0 volumes, 1 dataset label, override mount point -> 1 patch with the overridden mountpoint", &testPodLabels{ + makeInputPodSpec: func() *corev1.Pod { + inputPod := testing.MakePod("test-1", "test"). + AddLabelToPodMetadata("dataset.0.id", "testds"). + AddLabelToPodMetadata("dataset.0.useas", "mount"). + AddContainerToPod(testing.MakeContainer("foo"). + AddVolumeMount("/mount/testds", "testds").Obj()). + Obj() + return &inputPod + }, + makeOutputPatchOperations: func() []jsonpatch.JsonPatchOperation { + patchArray := []jsonpatch.JsonPatchOperation{ + testing.MakeJSONPatchOperation(). + SetOperation("add"). + SetVolumeasPath(0). + SetPVCasValue("testds"). + Obj(), + } + return patchArray + }, + }), Entry("Pod with 1 volumes, 1 dataset label, useas configmap -> 1 configmap", &testPodLabels{ makeInputPodSpec: func() *corev1.Pod { inputPod := testing.MakePod("test-1", "test"). @@ -313,28 +344,66 @@ var _ = DescribeTable("Pod is mutated correctly", return []jsonpatch.JsonPatchOperation{} }, }), + + Entry("Pod with 1 volumes, different mountPath, 1 dataset label, useas mount -> 0 patches", &testPodLabels{ + makeInputPodSpec: func() *corev1.Pod { + inputPod := testing.MakePod("test-1", "test"). + AddLabelToPodMetadata("dataset.0.id", "testds"). + AddLabelToPodMetadata("dataset.0.useas", "mount"). + AddVolumeToPod("testds"). + AddContainerToPod(testing.MakeContainer("foo"). + AddVolumeMount("/mnt/volumes/", "testds").Obj()). + Obj() + return &inputPod + }, + makeOutputPatchOperations: func() []jsonpatch.JsonPatchOperation { + patchArray := []jsonpatch.JsonPatchOperation{} + return patchArray + }, + }), ) type testAdmissionRequest struct { - inputRequest func() *admissionv1.AdmissionRequest - outResponse func() *admissionv1.AdmissionResponse + inputRequest func() admission.Request + outResponse func() admission.Response } var _ = DescribeTable("Mutation operation happens correctly", func(ts *testAdmissionRequest) { + m := DatasetPodMutator{ + Client: k8sClient, + Decoder: admission.NewDecoder(runtime.NewScheme()), + } + ctx := context.Background() + out := m.Handle(ctx, ts.inputRequest()) + Expect(out).Should(Equal(ts.outResponse())) + }, - Entry("", &testAdmissionRequest{ - inputRequest: func() *admissionv1.AdmissionRequest { - return nil + Entry("Passthrough for delete operations", &testAdmissionRequest{ + inputRequest: func() admission.Request { + req := testing.MakeAdmissionRequest(). + SetName("test"). + SetNamespace("test"). + SetOperation(admissionv1.Delete).Obj() + return req }, - outResponse: func() *admissionv1.AdmissionResponse { - return nil + outResponse: func() admission.Response { + msg := fmt.Sprintf("No Pod mutation required for operation %v.", admissionv1.Delete) + return admission.Allowed(msg) }, })) - var _ = AfterSuite(func() { By("tearing down the test environment") err := testEnv.Stop() Expect(err).NotTo(HaveOccurred()) }) + +func serialize(obj any) ([]byte, error) { + b, err := json.Marshal(obj) + if err != nil { + //logf.Errorf("could not serialize bject") + return nil, err + } + return b, nil +} diff --git a/src/dataset-operator/controllers/datasetinternal_controller.go b/src/dataset-operator/controllers/datasetinternal_controller.go index 5e15e5d1..87a46582 100644 --- a/src/dataset-operator/controllers/datasetinternal_controller.go +++ b/src/dataset-operator/controllers/datasetinternal_controller.go @@ -113,10 +113,11 @@ func (r *DatasetInternalReconciler) Reconcile(ctx context.Context, req ctrl.Requ err := r.Client.Get(context.TODO(), req.NamespacedName, foundPVC) if err == nil { reqLogger.Info("COS-related PVC still exists, deleting...") + //TODO - check before deletion if the UsedBy field for the PVC is empty delErr := r.Client.Delete(context.TODO(), foundPVC) if delErr != nil { //What happens when we cannot delete the PVC ? - reqLogger.Info("Could not delete the PVC", delErr) + reqLogger.Info("Could not delete the PVC", "Error", delErr) } return reconcile.Result{Requeue: true}, delErr } else if !errors.IsNotFound(err) { diff --git a/src/dataset-operator/go.mod b/src/dataset-operator/go.mod index 5a233d6f..e30b6e11 100644 --- a/src/dataset-operator/go.mod +++ b/src/dataset-operator/go.mod @@ -7,10 +7,12 @@ toolchain go1.22.2 require ( github.com/akolb1/gometastore v0.0.0-20221218020403-aaa7217ecd00 github.com/go-logr/logr v1.4.1 + github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 github.com/kubernetes-csi/csi-test/v5 v5.0.0 github.com/onsi/ginkgo/v2 v2.17.1 github.com/onsi/gomega v1.32.0 + go.uber.org/zap v1.26.0 gomodules.xyz/jsonpatch/v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.30.0 @@ -56,7 +58,6 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect diff --git a/src/dataset-operator/go.sum b/src/dataset-operator/go.sum index 7c038a85..c5daeec4 100644 --- a/src/dataset-operator/go.sum +++ b/src/dataset-operator/go.sum @@ -65,6 +65,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/src/dataset-operator/testing/wrapper.go b/src/dataset-operator/testing/wrapper.go index fe7a33d7..49517176 100644 --- a/src/dataset-operator/testing/wrapper.go +++ b/src/dataset-operator/testing/wrapper.go @@ -6,8 +6,11 @@ import ( datasetsv1alpha1 "github.com/datashim-io/datashim/src/dataset-operator/api/v1alpha1" "gomodules.xyz/jsonpatch/v2" + admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) // basic idea for templating K8s objects to be used in tests comes @@ -345,3 +348,64 @@ func (j *JSONPatchOperationWrapper) AddSecretRefsToValue(secret_names []string) func (j *JSONPatchOperationWrapper) Obj() jsonpatch.JsonPatchOperation { return j.JsonPatchOperation } + +type AdmissionRequestWrapper struct { + admission.Request +} + +func MakeAdmissionRequest() *AdmissionRequestWrapper { + return &AdmissionRequestWrapper{ + Request: admission.Request{}, + } +} + +func (r *AdmissionRequestWrapper) SetName(name string) *AdmissionRequestWrapper { + r.Name = name + return r +} + +func (r *AdmissionRequestWrapper) SetNamespace(namespace string) *AdmissionRequestWrapper { + r.Namespace = namespace + return r +} + +func (r *AdmissionRequestWrapper) SetOperation(op admissionv1.Operation) *AdmissionRequestWrapper { + r.Operation = op + return r +} + +func (r *AdmissionRequestWrapper) SetObject(obj runtime.RawExtension) *AdmissionRequestWrapper { + r.Object = obj + return r +} + +func (r *AdmissionRequestWrapper) Obj() admission.Request { + return r.Request +} + +type AdmissionResponseWrapper struct { + admission.Response +} + +func MakeAdmissionResponse() *AdmissionResponseWrapper { + return &AdmissionResponseWrapper{ + Response: admission.Response{}, + } +} + +func (rs *AdmissionResponseWrapper) AddPatches(patch jsonpatch.JsonPatchOperation) *AdmissionResponseWrapper { + if rs.Patches == nil { + rs.Patches = []jsonpatch.JsonPatchOperation{} + } + rs.Patches = append(rs.Patches, patch) + return rs +} + +func (rs *AdmissionResponseWrapper) SetAdmissionResponse(resp admissionv1.AdmissionResponse) *AdmissionResponseWrapper { + rs.AdmissionResponse = resp + return rs +} + +func (rs *AdmissionResponseWrapper) Obj() admission.Response { + return rs.Response +}