Skip to content

Commit

Permalink
e2e: default policy test (#1007)
Browse files Browse the repository at this point in the history
e2e: Add waiting mechanism and correct event watching/listing to default policy test

Structure changes to deploy.go

e2e: rework and fetch event from the watch.Event directly

e2e: Add default policy into policy test package
  • Loading branch information
jmxnzo authored Nov 21, 2024
1 parent ba7fb9d commit 187ea01
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 1 deletion.
103 changes: 103 additions & 0 deletions e2e/internal/kubeclient/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ const (
InitContainersRunning
)

// WaitEventCondition is an enum type for the possible wait conditions when using `kubeclient.WaitForEvent`.
type WaitEventCondition int

const (
_ WaitEventCondition = iota
// StartingBlocked waits until a specific FailedCreatePodSandBox Event is detected which indicates that the container does not start.
StartingBlocked
)

// ResourceWaiter is implemented by resources that can be waited for with WaitFor.
type ResourceWaiter interface {
kind() string
Expand Down Expand Up @@ -225,6 +234,32 @@ func (c *Kubeclient) checkIfRunning(ctx context.Context, name string, namespace
return true, nil
}

// IsStartingBlocked checks whether the FailedCreatePodSandBox Event occurred which indicates that the SetPolicy request is rejected and the Kata Shim fails to start the Pod sandbox.
func (c *Kubeclient) IsStartingBlocked(name string, namespace string, resource ResourceWaiter, evt watch.Event, startingPoint time.Time) (bool, error) {
switch evt.Type {
case watch.Error:
return false, fmt.Errorf("Watcher of %s %s/%s received an error event", resource.kind(), namespace, name)
case watch.Added:
fallthrough
case watch.Modified:
logger := c.log.With("namespace", namespace)
event, ok := evt.Object.(*corev1.Event)
if !ok {
return false, fmt.Errorf("watcher received unexpected type %T", evt.Object)
}

// Expected event: Reason: FailedCreatePodSandBox
// TODO(jmxnzo): Add patch to the existing error message in Kata Shim, to specifically allow detecting start-up of containers without policy annotation.
if (event.LastTimestamp.After(startingPoint)) && event.Reason == "FailedCreatePodSandBox" {
logger.Debug("Pod did not start", "name", name, "reason", event.Reason, "timestamp of failure", event.LastTimestamp.String())
return true, nil
}
return false, nil
default:
return false, fmt.Errorf("unexpected watch event while waiting for %s %s/%s: type=%s, object=%#v", resource.kind(), namespace, name, evt.Type, evt.Object)
}
}

// WaitFor watches the given resource kind and blocks until the desired number of pods are
// ready or the context expires (is cancelled or times out).
func (c *Kubeclient) WaitFor(ctx context.Context, condition WaitCondition, resource ResourceWaiter, namespace, name string) error {
Expand Down Expand Up @@ -370,6 +405,74 @@ loop:
}
}

// WaitForEvent watches the EventList as long as the Event corresponding to the WaitCondition occurred after calling the function. It reimplements WaitFor() specifically designed to wait for ocuring events.
func (c *Kubeclient) WaitForEvent(ctx context.Context, condition WaitEventCondition, resource ResourceWaiter, namespace, name string) error {
// StartingPoint is saved right here to avoid the processing of past events in the checking function! This was introduced, because otherwise calling waitForEvent multiple times
// resulted in reusing events with the same timestamp.
startingPoint := time.Now()
retryCounter := 30
retryLoop:
for {
// Watcher which preprocesses the eventList for the defined resource, based on the involvedObject name and the resource kind.
watcher, err := c.Client.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{FieldSelector: "involvedObject.name=" + name, TypeMeta: metav1.TypeMeta{Kind: resource.kind()}})
if err != nil {
// If the server is down (because K3s was restarted), wait for a
// second and try again.
retryCounter--
if retryCounter != 0 {
sleep, cancel := context.WithTimeout(ctx, time.Second*1)
defer cancel()
<-sleep.Done()
continue retryLoop
}
return err
}
for {
evt, ok := <-watcher.ResultChan()

if !ok {
origErr := ctx.Err()
if origErr == nil {
retryCounter--
if retryCounter != 0 {
continue retryLoop
}
return fmt.Errorf("watcher for %s %s/%s unexpectedly closed", resource.kind(), namespace, name)
}
logger := c.log.With("namespace", namespace)
logger.Error("failed to wait for event", "condition", condition, "kind", resource, "name", name, "contextErr", ctx.Err())
if ctx.Err() != context.DeadlineExceeded {
return ctx.Err()
}
// Fetch and print debug information.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
pods, err := resource.getPods(ctx, c, namespace, name) //nolint:contextcheck // The parent context expired.
if err != nil {
logger.Error("could not fetch pods for resource", "kind", resource.kind(), "name", name, "error", err)
return origErr
}
for _, pod := range pods {
if !isPodReady(&pod) {
logger.Debug("pod not ready", "name", pod.Name, "status", c.toJSON(pod.Status))
}
}
return origErr
}
switch condition {
case StartingBlocked:
blocked, err := c.IsStartingBlocked(name, namespace, resource, evt, startingPoint)
if err != nil {
return err
}
if blocked {
return nil
}
}
}
}
}

func (c *Kubeclient) toJSON(a any) string {
s, err := json.Marshal(a)
if err != nil {
Expand Down
27 changes: 26 additions & 1 deletion e2e/policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
opensslBackend = "openssl-backend"
opensslFrontend = "openssl-frontend"
coordinator = "coordinator"
// Persistent pod identifier of StatefulSet Coordinator is used.
coordinatorPod = "coordinator-0"
)

var (
Expand All @@ -58,11 +60,34 @@ func TestPolicy(t *testing.T) {

ct.Init(t, resources)

// Apply deployment using default policies
require.True(t, t.Run("apply", ct.Apply), "Kubernetes resources need to be applied for subsequent tests")

t.Run("check containers without policy annotation do not start", func(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), ct.FactorPlatformTimeout(2*time.Minute))
defer cancel()

c := kubeclient.NewForTest(t)

t.Log("Waiting to ensure container start up failed")

err := c.WaitForEvent(ctx, kubeclient.StartingBlocked, kubeclient.Pod{}, ct.Namespace, coordinatorPod)
require.NoError(err)

t.Log("Restarting container")

require.NoError(c.Restart(ctx, kubeclient.Deployment{}, ct.Namespace, coordinator))
t.Log("Waiting to ensure container start up failed")

errRst := c.WaitForEvent(ctx, kubeclient.StartingBlocked, kubeclient.Pod{}, ct.Namespace, coordinatorPod)
require.NoError(errRst)
})

// initial deployment with pod allowed
require.True(t, t.Run("generate", ct.Generate), "contrast generate needs to succeed for subsequent tests")

require.True(t, t.Run("apply", ct.Apply), "Kubernetes resources need to be applied for subsequent tests")

require.True(t, t.Run("set", ct.Set), "contrast set needs to succeed for subsequent tests")
require.True(t, t.Run("contrast verify", ct.Verify), "contrast verify needs to succeed for subsequent tests")

Expand Down

0 comments on commit 187ea01

Please sign in to comment.