From b0f0dda517e88ccd44ffe7962e68dd15de34a8df Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 18 Oct 2023 21:24:24 -0700 Subject: [PATCH] [processor/k8sattributes] Handle all resource deletion event types (#27847) **Description:** The k8s go client's cache expects OnDelete handlers to handle objects of type DeletedFinalStateUnknown when the cache's watch mechanism misses a delete and notices later. This changes the processor to handle such deletes as if they were normal, rather than logging an error and dropping the change. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27632 **Testing:** Only what you see in the unit tests. I am open to suggestions, but I don't see this being a code path we can reasonably cover in the e2e test suite. Verified manually locally on a kind cluster. * Stood up two deployments loosely based off e2e testing resources, one w/ a collector built from this branch and the other docker.io/otel/opentelemetry-collector-contrib:latest. * Both included an additional container in the collector pod I used to fiddle with iptables rules. * Added rules to reject traffic to/from the kube api server * Deleted some namespaces containing deployments generating telemetry. * Restored connectivity by removing the iptables rules. * Observed the collector built from this branch was silent (aside from the junk the k8s client logs due to the broken connection) * Observed the latest ([0.87.0](https://hub.docker.com/layers/otel/opentelemetry-collector-contrib/0.87.0/images/sha256-77cdd395b828b09cb920c671966f09a87a40611aa6107443146086f2046f4a9a?context=explore)) collector logged a handful of errors for the deleted resources (api_v1.Pod, and apps_v1.ReplicaSet. I probably just didn't wait long enough for Namespace.) ``` 2023-10-19T02:18:37.781Z error kube/client.go:236 object received was not of type api_v1.Pod {"kind": "processor", "name": "k8sattributes", "pipeline": "metrics", "received": {"Key":"src1/telemetrygen-patched-766d55cbcb-8zktr","Obj":{"metadata":{"name":"telemetrygen-patched-766d55cbcb-8zktr","namespace":"src1","uid":"be5d2268-c8b0-434d-b3b8-8b18083c7a8b","creat ionTimestamp":"2023-10-19T02:01:08Z","labels":{"app":"telemetrygen-patched","pod-template-hash":"766d55cbcb"},"ownerReferences":[{"apiVersion":"apps/v1","kind":"ReplicaSet","name":"telemetrygen-patched-766d55cbcb","uid":"a887d67a-d5d6-4269-b520-45dbb4f1cd82","controller":true,"blockOwnerDeletion":true}]},"spec":{"containers":[{"name":"telemetrygen","image":"localhost/telemetrygen :latest","resources":{}}],"nodeName":"manual-e2e-testing-control-plane"},"status":{"podIP":"10.244.0.56","startTime":"2023-10-19T02:01:08Z","containerStatuses":[{"name":"telemetrygen","state":{},"lastState":{},"ready":false,"restartCount":0,"image":"","imageID":"","containerID":"containerd://2821ef32cd8bf93a13414504c0f8f0c016c84be49d6ffdbd475d7e4681e90c51"}]}}}} github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube.(*WatchClient).handlePodDelete github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor@v0.87.0/internal/kube/client.go:236 k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnDelete k8s.io/client-go@v0.28.2/tools/cache/controller.go:253 ... 2023-10-19T02:19:03.970Z error kube/client.go:868 object received was not of type apps_v1.ReplicaSet {"kind": "processor", "name": "k8sattributes", "pipeline": "metrics", "received": {"Key":"src1/telemetrygen-stable-5c444bb8b8","Obj":{"metadata":{"name":"telemetrygen-stable-5c444bb8b8","namespace":"src1","uid":"d37707ff-b308-4339-8543-a1caf5705ea8","creationTimestamp":null,"ownerReferences":[{"apiVersion":"apps/v1","kind":"Deployment","name":"telemetrygen-stable","uid":"c421276e-e1bf-40c5-85e1-e92e30363da5","controller":true,"blockOwnerDeletion":true}]},"spec":{"selector":null,"template":{"metadata":{"creationTimestamp":null},"spec":{"containers":null}}},"status":{"replicas":0}}}} github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube.(*WatchClient).handleReplicaSetDelete github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor@v0.87.0/internal/kube/client.go:868 k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnDelete k8s.io/client-go@v0.28.2/tools/cache/controller.go:253 k8s.io/client-go/tools/cache.(*processorListener).run.func1 k8s.io/client-go@v0.28.2/tools/cache/shared_informer.go:979 k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 ... ``` **Documentation:** N/A - it is not clear to me whether or not this should land on the changelog. Its impact on users is marginal. Signed-off-by: Christian Kruse --- .../internal/kube/client.go | 16 ++++++++-- .../internal/kube/client_test.go | 29 ++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 019105eb2a0e..47ab6255848e 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -230,7 +230,7 @@ func (c *WatchClient) handlePodUpdate(_, newPod interface{}) { func (c *WatchClient) handlePodDelete(obj interface{}) { observability.RecordPodDeleted() - if pod, ok := obj.(*api_v1.Pod); ok { + if pod, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Pod); ok { c.forgetPod(pod) } else { c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj)) @@ -259,7 +259,7 @@ func (c *WatchClient) handleNamespaceUpdate(_, newNamespace interface{}) { func (c *WatchClient) handleNamespaceDelete(obj interface{}) { observability.RecordNamespaceDeleted() - if namespace, ok := obj.(*api_v1.Namespace); ok { + if namespace, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Namespace); ok { c.m.Lock() if ns, ok := c.Namespaces[namespace.Name]; ok { // When a namespace is deleted all the pods(and other k8s objects in that namespace) in that namespace are deleted before it. @@ -859,7 +859,7 @@ func (c *WatchClient) handleReplicaSetUpdate(_, newRS interface{}) { func (c *WatchClient) handleReplicaSetDelete(obj interface{}) { observability.RecordReplicaSetDeleted() - if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok { + if replicaset, ok := ignoreDeletedFinalStateUnknown(obj).(*apps_v1.ReplicaSet); ok { c.m.Lock() key := string(replicaset.UID) delete(c.ReplicaSets, key) @@ -915,3 +915,13 @@ func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) { } return nil, false } + +// ignoreDeletedFinalStateUnknown returns the object wrapped in +// DeletedFinalStateUnknown. Useful in OnDelete resource event handlers that do +// not need the additional context. +func ignoreDeletedFinalStateUnknown(obj interface{}) interface{} { + if obj, ok := obj.(cache.DeletedFinalStateUnknown); ok { + return obj.Obj + } + return obj +} diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index fdaf1d5e5c3f..aa31c9f31423 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/selection" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) @@ -250,6 +251,14 @@ func TestReplicaSetHandler(t *testing.T) { // test delete replicaset c.handleReplicaSetDelete(updatedReplicaset) assert.Equal(t, len(c.ReplicaSets), 0) + // test delete replicaset when DeletedFinalStateUnknown + c.handleReplicaSetAdd(replicaset) + require.Equal(t, len(c.ReplicaSets), 1) + c.handleReplicaSetDelete(cache.DeletedFinalStateUnknown{ + Obj: replicaset, + }) + assert.Equal(t, len(c.ReplicaSets), 0) + } func TestPodHostNetwork(t *testing.T) { @@ -427,13 +436,14 @@ func TestPodDelete(t *testing.T) { assert.False(t, deleteRequest.ts.Before(tsBeforeDelete)) assert.False(t, deleteRequest.ts.After(time.Now())) + // delete when DeletedFinalStateUnknown c.deleteQueue = c.deleteQueue[:0] pod = &api_v1.Pod{} pod.Name = "podC" pod.Status.PodIP = "2.2.2.2" pod.UID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" tsBeforeDelete = time.Now() - c.handlePodDelete(pod) + c.handlePodDelete(cache.DeletedFinalStateUnknown{Obj: pod}) assert.Equal(t, 5, len(c.Pods)) assert.Equal(t, 5, len(c.deleteQueue)) deleteRequest = c.deleteQueue[0] @@ -464,6 +474,23 @@ func TestNamespaceDelete(t *testing.T) { assert.Equal(t, 2, len(c.Namespaces)) got := c.Namespaces["namespaceA"] assert.Equal(t, "namespaceA", got.Name) + // delete non-existent namespace when DeletedFinalStateUnknown + c.handleNamespaceDelete(cache.DeletedFinalStateUnknown{Obj: namespace}) + assert.Equal(t, 2, len(c.Namespaces)) + got = c.Namespaces["namespaceA"] + assert.Equal(t, "namespaceA", got.Name) + + // delete namespace A + namespace.Name = "namespaceA" + c.handleNamespaceDelete(namespace) + assert.Equal(t, 1, len(c.Namespaces)) + got = c.Namespaces["namespaceB"] + assert.Equal(t, "namespaceB", got.Name) + + // delete namespace B when DeletedFinalStateUnknown + namespace.Name = "namespaceB" + c.handleNamespaceDelete(cache.DeletedFinalStateUnknown{Obj: namespace}) + assert.Equal(t, 0, len(c.Namespaces)) } func TestDeleteQueue(t *testing.T) {