Skip to content

Commit

Permalink
Fail fast on creation and cleanup regardless of ctx - RunSnapshotRest…
Browse files Browse the repository at this point in the history
…ore (#103)

* Fail fast on creation and cleanup regardless of ctx

* changes based on comments

* changes based on comments

* add test coverage for WaitForPVCReady

* always check Events on error waiting for pvc/pod ready
  • Loading branch information
KastenMike authored Jun 30, 2022
1 parent 81fa242 commit 281e8ec
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 19 deletions.
92 changes: 91 additions & 1 deletion pkg/csi/csi_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/kanisterio/kanister/pkg/kube"
kankube "github.com/kanisterio/kanister/pkg/kube"
kansnapshot "github.com/kanisterio/kanister/pkg/kube/snapshot"
"github.com/kanisterio/kanister/pkg/poll"
"github.com/kastenhq/kubestr/pkg/common"
"github.com/kastenhq/kubestr/pkg/csi/types"
snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
Expand All @@ -19,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -27,6 +30,13 @@ import (
"k8s.io/client-go/transport/spdy"
)

const (
defaultReadyWaitTimeout = 2 * time.Minute

PVCKind = "PersistentVolumeClaim"
PodKind ="Pod"
)

//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_argument_validator.go -package=mocks . ArgumentValidator
type ArgumentValidator interface {
//Rename
Expand Down Expand Up @@ -83,11 +93,13 @@ func (o *validateOperations) ValidateVolumeSnapshotClass(ctx context.Context, vo
type ApplicationCreator interface {
CreatePVC(ctx context.Context, args *types.CreatePVCArgs) (*v1.PersistentVolumeClaim, error)
CreatePod(ctx context.Context, args *types.CreatePodArgs) (*v1.Pod, error)
WaitForPVCReady(ctx context.Context, namespace string, pvcName string) error
WaitForPodReady(ctx context.Context, namespace string, podName string) error
}

type applicationCreate struct {
kubeCli kubernetes.Interface
k8sObjectReadyTimeout time.Duration
}

func (c *applicationCreate) CreatePVC(ctx context.Context, args *types.CreatePVCArgs) (*v1.PersistentVolumeClaim, error) {
Expand Down Expand Up @@ -187,14 +199,92 @@ func (c *applicationCreate) CreatePod(ctx context.Context, args *types.CreatePod
return podRes, nil
}

func (c *applicationCreate) WaitForPVCReady(ctx context.Context, namespace, name string) error {
if c.kubeCli == nil {
return fmt.Errorf("kubeCli not initialized")
}

err := c.waitForPVCReady(ctx, namespace, name)
if err != nil {
eventErr := c.getErrorFromEvents(ctx, namespace, name, PVCKind)
if eventErr != nil {
return errors.Wrapf(eventErr, "had issues creating PVC")
}
}
return err
}

func (c *applicationCreate) waitForPVCReady(ctx context.Context, namespace string, name string) error {
pvcReadyTimeout := c.k8sObjectReadyTimeout
if pvcReadyTimeout == 0 {
pvcReadyTimeout = defaultReadyWaitTimeout
}

timeoutCtx, waitCancel := context.WithTimeout(ctx, pvcReadyTimeout)
defer waitCancel()
return poll.Wait(timeoutCtx, func(ctx context.Context) (bool, error) {
pvc, err := c.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Get(timeoutCtx, name, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, "Could not find PVC")
}

if pvc.Status.Phase == v1.ClaimLost {
return false, fmt.Errorf("failed to create a PVC, ClaimLost")
}

return pvc.Status.Phase == v1.ClaimBound, nil
})
}

func (c *applicationCreate) WaitForPodReady(ctx context.Context, namespace string, podName string) error {
if c.kubeCli == nil {
return fmt.Errorf("kubeCli not initialized")
}
err := kankube.WaitForPodReady(ctx, c.kubeCli, namespace, podName)
err := c.waitForPodReady(ctx, namespace, podName)
if err != nil {
eventErr := c.getErrorFromEvents(ctx, namespace, podName, PodKind)
if eventErr != nil {
return errors.Wrapf(eventErr, "had issues creating Pod")
}
}
return err
}

func (c *applicationCreate) waitForPodReady(ctx context.Context, namespace string, podName string) error {
podReadyTimeout := c.k8sObjectReadyTimeout
if podReadyTimeout == 0 {
podReadyTimeout = defaultReadyWaitTimeout
}

timeoutCtx, waitCancel := context.WithTimeout(ctx, podReadyTimeout)
defer waitCancel()
err := kankube.WaitForPodReady(timeoutCtx, c.kubeCli, namespace, podName)
return err
}

func (c *applicationCreate) getErrorFromEvents(ctx context.Context, namespace, name, kind string) error {
fieldSelectors := fields.Set{
"involvedObject.kind": kind,
"involvedObject.name": name,
}.AsSelector().String()
listOptions := metav1.ListOptions{
TypeMeta: metav1.TypeMeta{Kind: kind},
FieldSelector: fieldSelectors,
}

events, eventErr := c.kubeCli.CoreV1().Events(namespace).List(ctx, listOptions)
if eventErr != nil {
return errors.Wrapf(eventErr, "failed to retreieve events for %s of kind: %s", name, kind)
}

for _, event := range events.Items {
if event.Type == v1.EventTypeWarning {
return fmt.Errorf(event.Message)
}
}
return nil
}

//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_snapshot_creator.go -package=mocks . SnapshotCreator
type SnapshotCreator interface {
NewSnapshotter() (kansnapshot.Snapshotter, error)
Expand Down
111 changes: 99 additions & 12 deletions pkg/csi/csi_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"errors"
"fmt"
"strings"

kansnapshot "github.com/kanisterio/kanister/pkg/kube/snapshot"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1"
pkgerrors "github.com/pkg/errors"
"github.com/kastenhq/kubestr/pkg/common"
"github.com/kastenhq/kubestr/pkg/csi/types"
snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
Expand Down Expand Up @@ -462,13 +464,15 @@ func (s *CSITestSuite) TestCreatePVC(c *C) {
func (s *CSITestSuite) TestCreatePod(c *C) {
ctx := context.Background()
for _, tc := range []struct {
description string
cli kubernetes.Interface
args *types.CreatePodArgs
failCreates bool
errChecker Checker
podChecker Checker
}{
{
description: "pod with container image and runAsUser 1000 created",
cli: fake.NewSimpleClientset(),
args: &types.CreatePodArgs{
GenerateName: "name",
Expand All @@ -482,17 +486,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) {
podChecker: NotNil,
},
{
cli: fake.NewSimpleClientset(),
args: &types.CreatePodArgs{
GenerateName: "name",
PVCName: "pvcname",
Namespace: "ns",
Command: []string{"somecommand"},
},
errChecker: IsNil,
podChecker: NotNil,
},
{
description: "Pod creation error on kubeCli",
cli: fake.NewSimpleClientset(),
args: &types.CreatePodArgs{
GenerateName: "name",
Expand All @@ -505,6 +499,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) {
podChecker: NotNil,
},
{
description: "Pod generate name arg not set",
cli: fake.NewSimpleClientset(),
args: &types.CreatePodArgs{
GenerateName: "",
Expand All @@ -516,6 +511,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) {
podChecker: IsNil,
},
{
description: "PVC name not set error",
cli: fake.NewSimpleClientset(),
args: &types.CreatePodArgs{
GenerateName: "name",
Expand All @@ -527,6 +523,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) {
podChecker: IsNil,
},
{
description: "default namespace pod is created",
cli: fake.NewSimpleClientset(),
args: &types.CreatePodArgs{
GenerateName: "name",
Expand All @@ -538,6 +535,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) {
podChecker: IsNil,
},
{
description: "ns namespace pod is created",
cli: fake.NewSimpleClientset(),
args: &types.CreatePodArgs{
GenerateName: "name",
Expand All @@ -549,12 +547,14 @@ func (s *CSITestSuite) TestCreatePod(c *C) {
podChecker: NotNil,
},
{
description: "kubeCli not initialized",
cli: nil,
args: &types.CreatePodArgs{},
errChecker: NotNil,
podChecker: IsNil,
},
} {
fmt.Println("test:", tc.description)
creator := &applicationCreate{kubeCli: tc.cli}
if tc.failCreates {
creator.kubeCli.(*fake.Clientset).Fake.PrependReactor("create", "pods", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
Expand Down Expand Up @@ -598,7 +598,6 @@ func (s *CSITestSuite) TestCreatePod(c *C) {
} else {
c.Check(pod.Spec.SecurityContext, IsNil)
}

}
}
}
Expand Down Expand Up @@ -1160,3 +1159,91 @@ func (s *CSITestSuite) TestDeleteSnapshot(c *C) {
c.Check(err, tc.errChecker)
}
}

func (s *CSITestSuite) TestWaitForPVCReady(c *C) {
ctx := context.Background()
const ns = "ns"
const pvc = "pvc"
boundPVC := s.getPVC(ns, pvc, v1.ClaimBound)
claimLostPVC := s.getPVC(ns, pvc, v1.ClaimLost)
stuckPVC := s.getPVC(ns, pvc, "")
normalGetFunc := func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return
}
deadlineExceededGetFunc := func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, pkgerrors.Wrapf(context.DeadlineExceeded, "some wrapped error")
}

warningEvent := v1.Event{
Type: v1.EventTypeWarning,
Message: "waiting for a volume to be created, either by external provisioner \"ceph.com/rbd\" or manually created by system administrator",
}
for _, tc := range []struct {
description string
cli kubernetes.Interface
pvcGetFunc func(action k8stesting.Action) (handled bool, ret runtime.Object, err error)
eventsList []v1.Event
errChecker Checker
errString string
}{
{
description: "Happy path",
cli: fake.NewSimpleClientset(boundPVC),
pvcGetFunc: normalGetFunc,
errChecker: IsNil,
},
{
description: "Missing PVC",
cli: fake.NewSimpleClientset(),
pvcGetFunc: normalGetFunc,
errChecker: NotNil,
errString: "Could not find PVC",
},
{
description: "PVC ClaimLost",
cli: fake.NewSimpleClientset(claimLostPVC),
pvcGetFunc: normalGetFunc,
errChecker: NotNil,
errString: "ClaimLost",
},
{
description: "context.DeadlineExceeded but no event warnings",
cli: fake.NewSimpleClientset(stuckPVC),
pvcGetFunc: deadlineExceededGetFunc,
errChecker: NotNil,
errString: context.DeadlineExceeded.Error(),
},
{
description: "context.DeadlineExceeded, unable to provision PVC",
cli: fake.NewSimpleClientset(stuckPVC),
pvcGetFunc: deadlineExceededGetFunc,
eventsList: []v1.Event{warningEvent},
errChecker: NotNil,
errString: warningEvent.Message,
},
}{
fmt.Println("test:", tc.description)
creator := &applicationCreate{kubeCli: tc.cli}
creator.kubeCli.(*fake.Clientset).PrependReactor("get", "persistentvolumeclaims", tc.pvcGetFunc)
creator.kubeCli.(*fake.Clientset).PrependReactor("list", "events", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &v1.EventList{Items: tc.eventsList}, nil
})
err := creator.WaitForPVCReady(ctx, ns, pvc)
c.Check(err, tc.errChecker)
if err != nil {
c.Assert(strings.Contains(err.Error(), tc.errString), Equals, true)
}
}
}

func (s *CSITestSuite) getPVC(ns, pvc string, phase v1.PersistentVolumeClaimPhase) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvc,
Namespace: ns,
},
Status: v1.PersistentVolumeClaimStatus {
Phase: phase,
},
}
}
13 changes: 13 additions & 0 deletions pkg/csi/mocks/mock_application_creator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 281e8ec

Please sign in to comment.