Skip to content

Commit

Permalink
Fix + test Pod deletion in KDD syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport authored and Neil Jerram committed Mar 27, 2017
1 parent 3893025 commit ab2351e
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 48 deletions.
27 changes: 14 additions & 13 deletions lib/backend/k8s/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ func (c converter) globalConfigToTPR(kvp *model.KVPair) thirdparty.GlobalConfig
return tpr
}

// isCalicoPod returns true if the pod should be shown as a workloadEndpoint
// isReadyCalicoPod returns true if the pod should be shown as a workloadEndpoint
// in the Calico API and false otherwise.
func (c converter) isCalicoPod(pod *kapiv1.Pod) bool {
func (c converter) isReadyCalicoPod(pod *kapiv1.Pod) bool {
if c.isHostNetworked(pod) {
log.WithField("pod", pod.Name).Debug("Pod is host networked.")
return false
Expand All @@ -219,23 +219,24 @@ func (c converter) hasIPAddress(pod *kapiv1.Pod) bool {
return pod.Status.PodIP != ""
}

// podToWorkloadEndpoint converts a Pod to a WorkloadEndpoint. It assumes the calling code
// has verified that the provided Pod is valid to convert to a WorkloadEndpoint.
func (c converter) podToWorkloadEndpoint(pod *kapiv1.Pod) (*model.KVPair, error) {
// Pull out the profile and workload ID based on pod name and Namespace.
profile := fmt.Sprintf("ns.projectcalico.org/%s", pod.ObjectMeta.Namespace)
workload := fmt.Sprintf("%s.%s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)

// If the pod isn't ready, we can't parse it.
if !c.isCalicoPod(pod) {
return nil, fmt.Errorf("Pod is not ready / valid. pod=%s", pod.Name)
}

// Parse the Pod's IP address.
_, ipNet, err := cnet.ParseCIDR(fmt.Sprintf("%s/32", pod.Status.PodIP))
if err != nil {
log.WithFields(log.Fields{"ip": pod.Status.PodIP, "pod": pod.Name}).WithError(err).Error("Failed to parse pod IP")
return nil, err
// We do, in some circumstances, want to parse Pods without an IP address. For example,
// a DELETE update will not include an IP.
ipNets := []cnet.IPNet{}
if c.hasIPAddress(pod) {
_, ipNet, err := cnet.ParseCIDR(fmt.Sprintf("%s/32", pod.Status.PodIP))
if err != nil {
log.WithFields(log.Fields{"ip": pod.Status.PodIP, "pod": pod.Name}).WithError(err).Error("Failed to parse pod IP")
return nil, err
}
ipNets = append(ipNets, *ipNet)
}
ipNets := []cnet.IPNet{*ipNet}

// Generate the interface name and MAC based on workload. This must match
// the host-side veth configured by the CNI plugin.
Expand Down
11 changes: 4 additions & 7 deletions lib/backend/k8s/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ import (
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/util/intstr"

"github.com/projectcalico/libcalico-go/lib/backend/k8s/resources"
"github.com/projectcalico/libcalico-go/lib/net"
)

var _ = Describe("Test parsing strings", func() {
Expand Down Expand Up @@ -129,7 +126,7 @@ var _ = Describe("Test Pod conversion", func() {
Expect(wep.Revision.(string)).To(Equal("1234"))
})

It("should fail to parse a Pod without an IP to a WorkloadEndpoint", func() {
It("should not parse a Pod without an IP to a WorkloadEndpoint", func() {
pod := k8sapi.Pod{
ObjectMeta: k8sapi.ObjectMeta{
Name: "podA",
Expand All @@ -149,7 +146,7 @@ var _ = Describe("Test Pod conversion", func() {
}

_, err := c.podToWorkloadEndpoint(&pod)
Expect(err).To(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
})

It("should parse a Pod with no labels", func() {
Expand Down Expand Up @@ -182,7 +179,7 @@ var _ = Describe("Test Pod conversion", func() {
Expect(wep.Value.(*model.WorkloadEndpoint).Labels).To(Equal(map[string]string{"calico/k8s_ns": "default"}))
})

It("should not Parse a Pod with no NodeName", func() {
It("should Parse a Pod with no NodeName", func() {
pod := k8sapi.Pod{
ObjectMeta: k8sapi.ObjectMeta{
Name: "podA",
Expand All @@ -195,7 +192,7 @@ var _ = Describe("Test Pod conversion", func() {
}

_, err := c.podToWorkloadEndpoint(&pod)
Expect(err).To(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
})

})
Expand Down
4 changes: 2 additions & 2 deletions lib/backend/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (c *KubeClient) listWorkloadEndpoints(l model.WorkloadEndpointListOptions)
ret := []*model.KVPair{}
for _, pod := range pods.Items {
// Decide if this pod should be displayed.
if !c.converter.isCalicoPod(&pod) {
if !c.converter.isReadyCalicoPod(&pod) {
continue
}

Expand All @@ -508,7 +508,7 @@ func (c *KubeClient) getWorkloadEndpoint(k model.WorkloadEndpointKey) (*model.KV
}

// Decide if this pod should be displayed.
if !c.converter.isCalicoPod(pod) {
if !c.converter.isReadyCalicoPod(pod) {
return nil, nil
}
return c.converter.podToWorkloadEndpoint(pod)
Expand Down
91 changes: 70 additions & 21 deletions lib/backend/k8s/k8s_fv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
. "github.com/onsi/gomega"

log "github.com/Sirupsen/logrus"

"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
"github.com/projectcalico/libcalico-go/lib/errors"
cnet "github.com/projectcalico/libcalico-go/lib/net"

k8sapi "k8s.io/client-go/pkg/api/v1"
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/util/wait"
)

// cb implements the callback interface required for the
Expand Down Expand Up @@ -104,17 +107,60 @@ func (c cb) ProcessUpdates() {
}
}

func (c cb) ExpectUpdate(kvps []model.KVPair) {
func (c cb) ExpectExists(kvps []model.KVPair) {
// For each Key, wait for it to exist.
for _, kvp := range kvps {
log.Infof("[TEST] Expecting key: %s", kvp.Key)
exists := false

wait.PollImmediate(1*time.Second, 60*time.Second, func() (bool, error) {
// Get the update.
c.Lock.Lock()
update, ok := c.State[kvp.Key.String()]
exists = ok
c.Lock.Unlock()

log.Infof("[TEST] Key exists? %t: %+v", ok, update)
if ok {
// Expected key to exist, and it does.
return true, nil
} else {
// Key does not yet exist.
return false, nil
}
})

// Get the update.
c.Lock.Lock()
update, ok := c.State[kvp.Key.String()]
c.Lock.Unlock()
// Expect the key to have existed.
Expect(exists).To(Equal(true), fmt.Sprintf("Expected key to exist: %s", kvp.Key))
}
}

// ExpectDeleted asserts that the provided KVPairs have been deleted
// via an update over the Syncer.
func (c cb) ExpectDeleted(kvps []model.KVPair) {
for _, kvp := range kvps {
log.Infof("[TEST] Not expecting key: %s", kvp.Key)
exists := true

wait.PollImmediate(1*time.Second, 60*time.Second, func() (bool, error) {
// Get the update.
c.Lock.Lock()
update, ok := c.State[kvp.Key.String()]
exists = ok
c.Lock.Unlock()

log.Infof("[TEST] Key exists? %t: %+v", ok, update)
if ok {
// Expected key to not exist, and it does.
return false, nil
} else {
// Expected key to not exist, and it doesn't.
return true, nil
}
})

log.Infof("[TEST] Key exists? %t: %+v", ok, update)
Expect(ok).To(Equal(true), fmt.Sprintf("Expected key to exist: %s", kvp.Key))
// Expect the key to not exist.
Expect(exists).To(Equal(false), fmt.Sprintf("Expected key not to exist: %s", kvp.Key))
}
}

Expand Down Expand Up @@ -198,7 +244,7 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() {
{Key: model.ProfileLabelsKey{model.ProfileKey{Name: expectedName}}},
}
time.Sleep(1 * time.Second)
cb.ExpectUpdate(expectedKeys)
cb.ExpectExists(expectedKeys)
})

It("should handle a Namespace without DefaultDeny", func() {
Expand Down Expand Up @@ -351,17 +397,17 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() {
}
_, err := c.clientSet.Pods("default").Create(&pod)

// Make sure we clean up after ourselves.
// Make sure we clean up after ourselves. This might fail if we reach the
// test below which deletes this pod, but that's OK.
defer func() {
log.Warnf("[TEST] Cleaning up test pod: %s", pod.ObjectMeta.Name)
err = c.clientSet.Pods("default").Delete(pod.ObjectMeta.Name, &k8sapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
_ = c.clientSet.Pods("default").Delete(pod.ObjectMeta.Name, &k8sapi.DeleteOptions{})
}()
By("creating a pod", func() {
By("Creating a pod", func() {
Expect(err).NotTo(HaveOccurred())
})

By("waiting for the pod to start", func() {
By("Waiting for the pod to start", func() {
// Wait up to 120s for pod to start running.
log.Warnf("[TEST] Waiting for pod %s to start", pod.ObjectMeta.Name)
for i := 0; i < 120; i++ {
Expand All @@ -378,14 +424,14 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() {
Expect(p.Status.Phase).To(Equal(k8sapi.PodRunning))
})

By("performing a List() operation", func() {
By("Performing a List() operation", func() {
// Perform List and ensure it shows up in the Calico API.
weps, err := c.List(model.WorkloadEndpointListOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(len(weps)).To(BeNumerically(">", 0))
})

By("performing a List(workloadID=pod) operation", func() {
By("Performing a List(workloadID=pod) operation", func() {
// Perform List, including a workloadID
weps, err := c.List(model.WorkloadEndpointListOptions{
WorkloadID: fmt.Sprintf("default.%s", pod.ObjectMeta.Name),
Expand All @@ -394,7 +440,7 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() {
Expect(len(weps)).To(Equal(1))
})

By("performing a Get() operation", func() {
By("Performing a Get() operation", func() {
// Perform a Get and ensure no error in the Calico API.
wep, err := c.Get(model.WorkloadEndpointKey{WorkloadID: fmt.Sprintf("default.%s", pod.ObjectMeta.Name)})
Expect(err).NotTo(HaveOccurred())
Expand All @@ -414,8 +460,7 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() {

By("Expecting an update on the Syncer API", func() {
// Expect corresponding updates over the syncer for this Pod.
time.Sleep(1 * time.Second)
cb.ExpectUpdate(expectedKeys)
cb.ExpectExists(expectedKeys)
})

By("Expecting a Syncer snapshot to include the update", func() {
Expand All @@ -424,11 +469,15 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() {
go snapshotCallbacks.ProcessUpdates()
snapshotSyncer.Start()

// Wait a bit for the snapshot to finish.
time.Sleep(2 * time.Second)
snapshotCallbacks.ExpectUpdate(expectedKeys)
// Expect the snapshot to include the right keys.
snapshotCallbacks.ExpectExists(expectedKeys)
})

By("Deleting the Pod and expecting the wep to be deleted", func() {
err = c.clientSet.Pods("default").Delete(pod.ObjectMeta.Name, &k8sapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
cb.ExpectDeleted(expectedKeys)
})
})

// Add a defer to wait for all pods to clean up.
Expand Down
22 changes: 17 additions & 5 deletions lib/backend/k8s/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (syn *kubeSyncer) performSnapshot() ([]model.KVPair, map[string]bool, resou
versions.podVersion = poList.ListMeta.ResourceVersion
for _, po := range poList.Items {
// Ignore any updates for pods which are not ready / valid.
if !syn.kc.converter.isCalicoPod(&po) {
if !syn.kc.converter.isReadyCalicoPod(&po) {
log.Debugf("Skipping pod %s/%s", po.ObjectMeta.Namespace, po.ObjectMeta.Name)
continue
}
Expand Down Expand Up @@ -526,10 +526,22 @@ func (syn *kubeSyncer) parsePodEvent(e watch.Event) *model.KVPair {
log.Panicf("Invalid pod event. Type: %s, Object: %+v", e.Type, e.Object)
}

// Ignore updates for Pods that aren't ready / valid.
if !syn.kc.converter.isCalicoPod(pod) {
log.Debugf("Skipping pod %s/%s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
return nil
switch e.Type {
case watch.Deleted:
// For deletes, the validity conditions are different. We only care if the update
// is not for a host-networked Pods, but don't care about IP / scheduled state.
if syn.kc.converter.isHostNetworked(pod) {
log.WithField("pod", pod.Name).Debug("Pod is host networked.")
log.Debugf("Skipping delete for pod %s/%s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
return nil
}
default:
// Ignore add/modify updates for Pods that shouldn't be shown in the Calico API.
// e.g host networked Pods, or Pods that don't yet have an IP address.
if !syn.kc.converter.isReadyCalicoPod(pod) {
log.Debugf("Skipping add/modify for pod %s/%s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
return nil
}
}

// Convert the received Pod into a KVPair.
Expand Down

0 comments on commit ab2351e

Please sign in to comment.