Skip to content

Commit

Permalink
Fix repeating mount entries (#361)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikumar003 authored May 21, 2024
1 parent 5efeb32 commit 11b79bb
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 19 deletions.
23 changes: 16 additions & 7 deletions src/dataset-operator/admissioncontroller/mutatingwebhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"strings"

datasetsv1alpha1 "github.com/datashim-io/datashim/src/dataset-operator/api/v1alpha1"
"github.com/go-logr/logr"
jsonpatch "gomodules.xyz/jsonpatch/v2"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand All @@ -28,7 +29,7 @@ const (
)

var (
log = ctrl.Log.WithName("datashim-webhook")
log = logf.Log.WithName("datashim-webhook")
)

//following the kubebuilder example for the pod mutator
Expand All @@ -41,10 +42,13 @@ type DatasetPodMutator struct {

func (m *DatasetPodMutator) Handle(ctx context.Context, req admission.Request) admission.Response {
// Mutate mutates

log = logf.FromContext(ctx)
log = log.WithValues("admission pod request", req)
log.V(1).Info("webhook received", "request", req)

if req.Operation != admissionv1.Create {
return admission.Allowed(fmt.Sprintf("No Pod mutation required for operation %v.", req.Operation))
}

pod := &corev1.Pod{}

err := m.Decoder.Decode(req, pod)
Expand Down Expand Up @@ -134,6 +138,7 @@ func (d *DatasetInput) String() string {

func DatasetInputFromPod(pod *corev1.Pod) (map[int]*DatasetInput, error) {
// Format is {"id": {"index": <str>, "useas": mount/configmap}
//log = log.WithName("dataset-label-processing")
log.V(1).Info("Pod labels", "labels", pod.Labels)

datasets := map[int]*DatasetInput{}
Expand Down Expand Up @@ -243,6 +248,7 @@ func RetrieveDatasetsFromAPIServer(ctx context.Context, client client.Client, po
}

func PatchPodWithDatasetLabels(pod *corev1.Pod, datasets map[int]*DatasetInput) ([]jsonpatch.JsonPatchOperation, error) {
//log = log.WithName("pod-patcher")
patchops := []jsonpatch.JsonPatchOperation{}

if len(datasets) == 0 {
Expand Down Expand Up @@ -304,7 +310,7 @@ func PatchPodWithDatasetLabels(pod *corev1.Pod, datasets map[int]*DatasetInput)

if len(datasets_tomount) > 0 {
log.V(1).Info("Patching volumes to Pod Spec", "datasets", datasets_tomount)
patch_ds := patchPodSpecWithDatasetPVCs(pod, datasets_tomount)
patch_ds := patchPodSpecWithDatasetPVCs(pod, datasets_tomount, log)
patchops = append(patchops, patch_ds...)
}

Expand All @@ -321,7 +327,7 @@ func PatchPodWithDatasetLabels(pod *corev1.Pod, datasets map[int]*DatasetInput)
return patchops, nil
}

func patchPodSpecWithDatasetPVCs(pod *corev1.Pod, datasets map[int]*DatasetInput) (patches []jsonpatch.JsonPatchOperation) {
func patchPodSpecWithDatasetPVCs(pod *corev1.Pod, datasets map[int]*DatasetInput, log logr.Logger) (patches []jsonpatch.JsonPatchOperation) {
patches = []jsonpatch.JsonPatchOperation{}

vol_id := len(pod.Spec.Volumes)
Expand Down Expand Up @@ -401,8 +407,9 @@ func patchContainersWithDatasetVolumes(pod *corev1.Pod, datasets map[int]*Datase
mount_idx := len(mounts)

for o := range order {
exists, _ := in_array(datasets[o], mount_names)
exists, _ := in_array(datasets[o].name, mount_names)
if !exists {
log.V(1).Info("Dataset is not already mounted", "dataset", datasets[o], "pod", pod.Name)
patch := jsonpatch.JsonPatchOperation{
Operation: "add",
Path: "/spec/" + container_typ + "/" + fmt.Sprint(container_idx) + "/volumeMounts/" + fmt.Sprint(mount_idx),
Expand All @@ -413,6 +420,8 @@ func patchContainersWithDatasetVolumes(pod *corev1.Pod, datasets map[int]*Datase
}
patchOps = append(patchOps, patch)
mount_idx += 1
} else {
log.V(1).Info("Dataset is already mounted", "dataset", datasets[o], "pod", pod.Name)
}
}
}
Expand Down
89 changes: 79 additions & 10 deletions src/dataset-operator/admissioncontroller/mutatingwebhook_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package admissioncontroller

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"

datasetsv1alpha1 "github.com/datashim-io/datashim/src/dataset-operator/api/v1alpha1"
testing "github.com/datashim-io/datashim/src/dataset-operator/testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.uber.org/zap/zapcore"
jsonpatch "gomodules.xyz/jsonpatch/v2"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment

type testPodLabels struct {
Expand All @@ -26,7 +35,7 @@ type testPodLabels struct {
}

var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.Level(zapcore.DebugLevel), zap.UseDevMode(true)))
By("bootstrapping test environment")

use_existing_cluster := true
Expand All @@ -52,7 +61,7 @@ var _ = BeforeSuite(func() {

//+kubebuilder:scaffold:scheme

k8sClient, err := client.New(cfg, client.Options{Scheme: scheme.Scheme})
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())

Expand All @@ -62,6 +71,7 @@ var _ = DescribeTable("Pod is mutated correctly",

pod := tc.makeInputPodSpec()
datasets, err := DatasetInputFromPod(pod)

Expect(err).ShouldNot(HaveOccurred())

Expect(PatchPodWithDatasetLabels(pod, datasets)).
Expand Down Expand Up @@ -227,6 +237,27 @@ var _ = DescribeTable("Pod is mutated correctly",
return patchArray
},
}),
Entry("Pod with 0 volumes, 1 dataset label, override mount point -> 1 patch with the overridden mountpoint", &testPodLabels{
makeInputPodSpec: func() *corev1.Pod {
inputPod := testing.MakePod("test-1", "test").
AddLabelToPodMetadata("dataset.0.id", "testds").
AddLabelToPodMetadata("dataset.0.useas", "mount").
AddContainerToPod(testing.MakeContainer("foo").
AddVolumeMount("/mount/testds", "testds").Obj()).
Obj()
return &inputPod
},
makeOutputPatchOperations: func() []jsonpatch.JsonPatchOperation {
patchArray := []jsonpatch.JsonPatchOperation{
testing.MakeJSONPatchOperation().
SetOperation("add").
SetVolumeasPath(0).
SetPVCasValue("testds").
Obj(),
}
return patchArray
},
}),
Entry("Pod with 1 volumes, 1 dataset label, useas configmap -> 1 configmap", &testPodLabels{
makeInputPodSpec: func() *corev1.Pod {
inputPod := testing.MakePod("test-1", "test").
Expand Down Expand Up @@ -313,28 +344,66 @@ var _ = DescribeTable("Pod is mutated correctly",
return []jsonpatch.JsonPatchOperation{}
},
}),

Entry("Pod with 1 volumes, different mountPath, 1 dataset label, useas mount -> 0 patches", &testPodLabels{
makeInputPodSpec: func() *corev1.Pod {
inputPod := testing.MakePod("test-1", "test").
AddLabelToPodMetadata("dataset.0.id", "testds").
AddLabelToPodMetadata("dataset.0.useas", "mount").
AddVolumeToPod("testds").
AddContainerToPod(testing.MakeContainer("foo").
AddVolumeMount("/mnt/volumes/", "testds").Obj()).
Obj()
return &inputPod
},
makeOutputPatchOperations: func() []jsonpatch.JsonPatchOperation {
patchArray := []jsonpatch.JsonPatchOperation{}
return patchArray
},
}),
)

type testAdmissionRequest struct {
inputRequest func() *admissionv1.AdmissionRequest
outResponse func() *admissionv1.AdmissionResponse
inputRequest func() admission.Request
outResponse func() admission.Response
}

var _ = DescribeTable("Mutation operation happens correctly",
func(ts *testAdmissionRequest) {

m := DatasetPodMutator{
Client: k8sClient,
Decoder: admission.NewDecoder(runtime.NewScheme()),
}
ctx := context.Background()
out := m.Handle(ctx, ts.inputRequest())
Expect(out).Should(Equal(ts.outResponse()))

},
Entry("", &testAdmissionRequest{
inputRequest: func() *admissionv1.AdmissionRequest {
return nil
Entry("Passthrough for delete operations", &testAdmissionRequest{
inputRequest: func() admission.Request {
req := testing.MakeAdmissionRequest().
SetName("test").
SetNamespace("test").
SetOperation(admissionv1.Delete).Obj()
return req
},
outResponse: func() *admissionv1.AdmissionResponse {
return nil
outResponse: func() admission.Response {
msg := fmt.Sprintf("No Pod mutation required for operation %v.", admissionv1.Delete)
return admission.Allowed(msg)
},
}))

var _ = AfterSuite(func() {
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})

func serialize(obj any) ([]byte, error) {
b, err := json.Marshal(obj)
if err != nil {
//logf.Errorf("could not serialize bject")
return nil, err
}
return b, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ func (r *DatasetInternalReconciler) Reconcile(ctx context.Context, req ctrl.Requ
err := r.Client.Get(context.TODO(), req.NamespacedName, foundPVC)
if err == nil {
reqLogger.Info("COS-related PVC still exists, deleting...")
//TODO - check before deletion if the UsedBy field for the PVC is empty
delErr := r.Client.Delete(context.TODO(), foundPVC)
if delErr != nil {
//What happens when we cannot delete the PVC ?
reqLogger.Info("Could not delete the PVC", delErr)
reqLogger.Info("Could not delete the PVC", "Error", delErr)
}
return reconcile.Result{Requeue: true}, delErr
} else if !errors.IsNotFound(err) {
Expand Down
3 changes: 2 additions & 1 deletion src/dataset-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ toolchain go1.22.2
require (
github.com/akolb1/gometastore v0.0.0-20221218020403-aaa7217ecd00
github.com/go-logr/logr v1.4.1
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/kubernetes-csi/csi-test/v5 v5.0.0
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
go.uber.org/zap v1.26.0
gomodules.xyz/jsonpatch/v2 v2.4.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.30.0
Expand Down Expand Up @@ -56,7 +58,6 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions src/dataset-operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
64 changes: 64 additions & 0 deletions src/dataset-operator/testing/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (

datasetsv1alpha1 "github.com/datashim-io/datashim/src/dataset-operator/api/v1alpha1"
"gomodules.xyz/jsonpatch/v2"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

// basic idea for templating K8s objects to be used in tests comes
Expand Down Expand Up @@ -345,3 +348,64 @@ func (j *JSONPatchOperationWrapper) AddSecretRefsToValue(secret_names []string)
func (j *JSONPatchOperationWrapper) Obj() jsonpatch.JsonPatchOperation {
return j.JsonPatchOperation
}

type AdmissionRequestWrapper struct {
admission.Request
}

func MakeAdmissionRequest() *AdmissionRequestWrapper {
return &AdmissionRequestWrapper{
Request: admission.Request{},
}
}

func (r *AdmissionRequestWrapper) SetName(name string) *AdmissionRequestWrapper {
r.Name = name
return r
}

func (r *AdmissionRequestWrapper) SetNamespace(namespace string) *AdmissionRequestWrapper {
r.Namespace = namespace
return r
}

func (r *AdmissionRequestWrapper) SetOperation(op admissionv1.Operation) *AdmissionRequestWrapper {
r.Operation = op
return r
}

func (r *AdmissionRequestWrapper) SetObject(obj runtime.RawExtension) *AdmissionRequestWrapper {
r.Object = obj
return r
}

func (r *AdmissionRequestWrapper) Obj() admission.Request {
return r.Request
}

type AdmissionResponseWrapper struct {
admission.Response
}

func MakeAdmissionResponse() *AdmissionResponseWrapper {
return &AdmissionResponseWrapper{
Response: admission.Response{},
}
}

func (rs *AdmissionResponseWrapper) AddPatches(patch jsonpatch.JsonPatchOperation) *AdmissionResponseWrapper {
if rs.Patches == nil {
rs.Patches = []jsonpatch.JsonPatchOperation{}
}
rs.Patches = append(rs.Patches, patch)
return rs
}

func (rs *AdmissionResponseWrapper) SetAdmissionResponse(resp admissionv1.AdmissionResponse) *AdmissionResponseWrapper {
rs.AdmissionResponse = resp
return rs
}

func (rs *AdmissionResponseWrapper) Obj() admission.Response {
return rs.Response
}

0 comments on commit 11b79bb

Please sign in to comment.