Skip to content

Commit

Permalink
fix(operator): prevent Tasks in terminal state from leaking PVCs (#93)
Browse files Browse the repository at this point in the history
prevent tasks in terminal state from leaking PVCs

Signed-off-by: Adrian Vaca Humanes <[email protected]>
  • Loading branch information
adrian-vaca-humanes-wt authored and strieflin committed Oct 31, 2024
1 parent b46b543 commit 6029adf
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 14 deletions.
4 changes: 4 additions & 0 deletions NOTICE.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ file in the Carbyne Stack

> **NOTE**: Please keep the following list of contributors sorted.
### Resolve.tech

- Adrián Vaca Humanes [[email protected]](mailto:[email protected])

### Robert Bosch GmbH

- Becker Sebastian
Expand Down
61 changes: 54 additions & 7 deletions klyshko-operator/controllers/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 - for information on the respective copyright owner
Copyright (c) 2022-2024 - for information on the respective copyright owner
see the NOTICE file and/or the repository https://github.com/carbynestack/klyshko.
SPDX-License-Identifier: Apache-2.0
Expand All @@ -10,14 +10,20 @@ package controllers
import (
"context"
"fmt"
"io"
"math"
"path/filepath"
"strconv"
"strings"
"time"

klyshkov1alpha1 "github.com/carbynestack/klyshko/api/v1alpha1"
"github.com/carbynestack/klyshko/castor"
"github.com/google/uuid"
"github.com/jarcoal/httpmock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
clientv3 "go.etcd.io/etcd/client/v3"
"io"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -26,16 +32,11 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/utils/pointer"
"math"
"path/filepath"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"strconv"
"strings"
"time"
)

const (
Expand Down Expand Up @@ -343,6 +344,7 @@ var _ = Describe("Generating tuples", func() {
jobs []klyshkov1alpha1.TupleGenerationJob
localTasksByVCP []klyshkov1alpha1.TupleGenerationTask
generatorPodsByVCP []v1.Pod
taskPVCsByVCP []v1.PersistentVolumeClaim
)

BeforeEach(func() {
Expand Down Expand Up @@ -374,6 +376,7 @@ var _ = Describe("Generating tuples", func() {
localTasksByVCP = ensureTasksCreatedOnEachVcp(ctx, vc, scheduler, jobs, klyshkov1alpha1.TaskGenerating)

generatorPodsByVCP = ensureGeneratorPodsCreatedOnEachVcp(ctx, vc, localTasksByVCP)
taskPVCsByVCP = ensureTaskPVCsCreatedOnEachVcp(ctx, vc, localTasksByVCP)
ensureJobState(ctx, vc, scheduler, uuid.MustParse(jobs[0].Spec.ID), klyshkov1alpha1.JobRunning)
})

Expand All @@ -392,6 +395,17 @@ var _ = Describe("Generating tuples", func() {
Expect(vc.vcps[i].k8sClient.Status().Update(ctx, &pod)).Should(Succeed())
}
ensureJobState(ctx, vc, scheduler, uuid.MustParse(jobs[0].Spec.ID), klyshkov1alpha1.JobFailed)

for i := 0; i < NumberOfVCPs; i++ {
key := client.ObjectKey{
Namespace: taskPVCsByVCP[i].GetNamespace(),
Name: taskPVCsByVCP[i].GetName(),
}
Eventually(func() bool {
return apierrors.IsNotFound(vc.vcps[i].k8sClient.Get(ctx, key, &jobs[i]))
}, Timeout, PollingInterval).Should(BeTrue())
}

})
})

Expand All @@ -411,6 +425,16 @@ var _ = Describe("Generating tuples", func() {
Expect(vc.vcps[i].k8sClient.Status().Update(ctx, &pod)).Should(Succeed())
}
ensureJobState(ctx, vc, scheduler, uuid.MustParse(jobs[0].Spec.ID), klyshkov1alpha1.JobFailed)

for i := 0; i < NumberOfVCPs; i++ {
key := client.ObjectKey{
Namespace: taskPVCsByVCP[i].GetNamespace(),
Name: taskPVCsByVCP[i].GetName(),
}
Eventually(func() bool {
return apierrors.IsNotFound(vc.vcps[i].k8sClient.Get(ctx, key, &jobs[i]))
}, Timeout, PollingInterval).Should(BeTrue())
}
})
})

Expand Down Expand Up @@ -519,6 +543,29 @@ func ensurePodsCreatedOnEachVcp(ctx context.Context, vc *vc, name func(int) type
return pods
}

// Ensures the PVCs associated to a task have been created, with the main purpose of checking for their
// non-existence after cleanup
func ensureTaskPVCsCreatedOnEachVcp(ctx context.Context, vc *vc, localTasks []klyshkov1alpha1.TupleGenerationTask) []v1.PersistentVolumeClaim {
pvcs := make([]v1.PersistentVolumeClaim, NumberOfVCPs)
for i := 0; i < NumberOfVCPs; i++ {
taskKey, _ := taskKeyFromName(localTasks[i].Namespace, localTasks[i].Name)
pvc := &v1.PersistentVolumeClaim{}
name := types.NamespacedName{
Name: pvcName(*taskKey),
Namespace: taskKey.Namespace,
}
Eventually(func() bool {
err := vc.vcps[i].k8sClient.Get(ctx, name, pvc)
if err != nil {
return false
}
return true
}, Timeout, PollingInterval).Should(BeTrue())
pvcs[i] = *pvc
}
return pvcs
}

// Ensures that provisioner pods associated with the respective tasks eventually become available for the given job in
// each VCP of the given VC. In addition, it is checked that the pod is owned by the respective task.
func ensureProvisionerPodsCreatedOnEachVcp(ctx context.Context, vc *vc, jobs []klyshkov1alpha1.TupleGenerationJob, localTasks []klyshkov1alpha1.TupleGenerationTask) []v1.Pod {
Expand Down
40 changes: 33 additions & 7 deletions klyshko-operator/controllers/tuplegenerationtask_controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2022-2023 - for information on the respective copyright owner
Copyright (c) 2022-2024 - for information on the respective copyright owner
see the NOTICE file and/or the repository https://github.com/carbynestack/klyshko.
SPDX-License-Identifier: Apache-2.0
Expand All @@ -11,6 +11,9 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/carbynestack/klyshko/logging"
clientv3 "go.etcd.io/etcd/client/v3"
v1 "k8s.io/api/core/v1"
Expand All @@ -19,8 +22,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"strconv"
"strings"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (r *TupleGenerationTaskReconciler) Reconcile(ctx context.Context, req ctrl.
logger := log.FromContext(ctx).WithValues("Task.Name", req.Name)
logger.V(logging.DEBUG).Info("Reconciling tuple generation task")

taskKey, err := r.taskKeyFromName(req.Namespace, req.Name)
taskKey, err := taskKeyFromName(req.Namespace, req.Name)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get key for task %v: %w", req.Name, err)
}
Expand Down Expand Up @@ -233,9 +234,12 @@ func (r *TupleGenerationTaskReconciler) Reconcile(ctx context.Context, req ctrl.
Requeue: true,
}, r.setState(ctx, *taskKey, status, klyshkov1alpha1.TaskFailed)
}
case klyshkov1alpha1.TaskFailed, klyshkov1alpha1.TaskCompleted:
logger.V(logging.DEBUG).Info("Task reached a terminal state")
return ctrl.Result{}, r.deletePVC(ctx, taskKey)
default:
return ctrl.Result{}, fmt.Errorf("unexpected state for Task %v, PVC not reclaimed", req.Name)
}

logger.V(logging.DEBUG).Info("Desired state reached")
return ctrl.Result{}, nil
}

Expand All @@ -250,7 +254,7 @@ func (r *TupleGenerationTaskReconciler) SetupWithManager(mgr ctrl.Manager) error

// taskKeyFromName creates a RosterEntryKey from the given name and namespace. Expects that the zero-based VCP
// identifier is appended with a hyphen to the name.
func (r *TupleGenerationTaskReconciler) taskKeyFromName(namespace string, name string) (*RosterEntryKey, error) {
func taskKeyFromName(namespace string, name string) (*RosterEntryKey, error) {
parts := strings.Split(name, "-")
vcpID := parts[len(parts)-1]
jobName := strings.Join(parts[:len(parts)-1], "-")
Expand Down Expand Up @@ -366,6 +370,28 @@ func (r *TupleGenerationTaskReconciler) getOrCreatePVC(ctx context.Context, key
return pvc, nil
}

// deletePVC deletes a PVC associated to a given task
func (r *TupleGenerationTaskReconciler) deletePVC(ctx context.Context, key *RosterEntryKey) error {
logger := log.FromContext(ctx).WithValues("Task.Key", key)
name := types.NamespacedName{
Name: pvcName(*key),
Namespace: key.Namespace,
}
found := &v1.PersistentVolumeClaim{}
err := r.Get(ctx, name, found)
if err != nil {
return fmt.Errorf("to be deleted persistent volume claim not found for task %v: %w", key, err)
}

err = r.Delete(ctx, found)
if err != nil {
return fmt.Errorf("persistent volume claim deletion failed for task %v: %w", key, err)
}

logger.V(logging.DEBUG).Info("Deleted Persistent Volume Claim for task")
return nil
}

// provisionerPodName returns the name for the provisioner pod used for the task with the given key.
func (r *TupleGenerationTaskReconciler) provisionerPodName(key RosterEntryKey) string {
return key.Name + "-provisioner"
Expand Down

0 comments on commit 6029adf

Please sign in to comment.