diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 019105eb2a0e..47ab6255848e 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -230,7 +230,7 @@ func (c *WatchClient) handlePodUpdate(_, newPod interface{}) { func (c *WatchClient) handlePodDelete(obj interface{}) { observability.RecordPodDeleted() - if pod, ok := obj.(*api_v1.Pod); ok { + if pod, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Pod); ok { c.forgetPod(pod) } else { c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj)) @@ -259,7 +259,7 @@ func (c *WatchClient) handleNamespaceUpdate(_, newNamespace interface{}) { func (c *WatchClient) handleNamespaceDelete(obj interface{}) { observability.RecordNamespaceDeleted() - if namespace, ok := obj.(*api_v1.Namespace); ok { + if namespace, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Namespace); ok { c.m.Lock() if ns, ok := c.Namespaces[namespace.Name]; ok { // When a namespace is deleted all the pods(and other k8s objects in that namespace) in that namespace are deleted before it. @@ -859,7 +859,7 @@ func (c *WatchClient) handleReplicaSetUpdate(_, newRS interface{}) { func (c *WatchClient) handleReplicaSetDelete(obj interface{}) { observability.RecordReplicaSetDeleted() - if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok { + if replicaset, ok := ignoreDeletedFinalStateUnknown(obj).(*apps_v1.ReplicaSet); ok { c.m.Lock() key := string(replicaset.UID) delete(c.ReplicaSets, key) @@ -915,3 +915,13 @@ func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) { } return nil, false } + +// ignoreDeletedFinalStateUnknown returns the object wrapped in +// DeletedFinalStateUnknown. Useful in OnDelete resource event handlers that do +// not need the additional context. +func ignoreDeletedFinalStateUnknown(obj interface{}) interface{} { + if obj, ok := obj.(cache.DeletedFinalStateUnknown); ok { + return obj.Obj + } + return obj +} diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index fdaf1d5e5c3f..aa31c9f31423 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/selection" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) @@ -250,6 +251,14 @@ func TestReplicaSetHandler(t *testing.T) { // test delete replicaset c.handleReplicaSetDelete(updatedReplicaset) assert.Equal(t, len(c.ReplicaSets), 0) + // test delete replicaset when DeletedFinalStateUnknown + c.handleReplicaSetAdd(replicaset) + require.Equal(t, len(c.ReplicaSets), 1) + c.handleReplicaSetDelete(cache.DeletedFinalStateUnknown{ + Obj: replicaset, + }) + assert.Equal(t, len(c.ReplicaSets), 0) + } func TestPodHostNetwork(t *testing.T) { @@ -427,13 +436,14 @@ func TestPodDelete(t *testing.T) { assert.False(t, deleteRequest.ts.Before(tsBeforeDelete)) assert.False(t, deleteRequest.ts.After(time.Now())) + // delete when DeletedFinalStateUnknown c.deleteQueue = c.deleteQueue[:0] pod = &api_v1.Pod{} pod.Name = "podC" pod.Status.PodIP = "2.2.2.2" pod.UID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" tsBeforeDelete = time.Now() - c.handlePodDelete(pod) + c.handlePodDelete(cache.DeletedFinalStateUnknown{Obj: pod}) assert.Equal(t, 5, len(c.Pods)) assert.Equal(t, 5, len(c.deleteQueue)) deleteRequest = c.deleteQueue[0] @@ -464,6 +474,23 @@ func TestNamespaceDelete(t *testing.T) { assert.Equal(t, 2, len(c.Namespaces)) got := c.Namespaces["namespaceA"] assert.Equal(t, "namespaceA", got.Name) + // delete non-existent namespace when DeletedFinalStateUnknown + c.handleNamespaceDelete(cache.DeletedFinalStateUnknown{Obj: namespace}) + assert.Equal(t, 2, len(c.Namespaces)) + got = c.Namespaces["namespaceA"] + assert.Equal(t, "namespaceA", got.Name) + + // delete namespace A + namespace.Name = "namespaceA" + c.handleNamespaceDelete(namespace) + assert.Equal(t, 1, len(c.Namespaces)) + got = c.Namespaces["namespaceB"] + assert.Equal(t, "namespaceB", got.Name) + + // delete namespace B when DeletedFinalStateUnknown + namespace.Name = "namespaceB" + c.handleNamespaceDelete(cache.DeletedFinalStateUnknown{Obj: namespace}) + assert.Equal(t, 0, len(c.Namespaces)) } func TestDeleteQueue(t *testing.T) {