Skip to content

Commit

Permalink
Handle DeletedFinalStateUnknown events form cache (#1278)
Browse files Browse the repository at this point in the history
* Handle DeletedFinalStateUnknown events form cache

Signed-off-by: Christian Kruse <[email protected]>

* update changelog

Signed-off-by: Christian Kruse <[email protected]>

---------

Signed-off-by: Christian Kruse <[email protected]>
  • Loading branch information
c-kruse authored Oct 11, 2023
1 parent a76f00d commit 6bc652d
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- fix(k8sprocessor): delay deleting the metadata from owner resources [#1242]
- fix(k8sprocessor): handle missed k8s resource deletions correctly [#1277]

[#1230]: https://github.com/SumoLogic/sumologic-otel-collector/pull/1230
[#1242]: https://github.com/SumoLogic/sumologic-otel-collector/pull/1242
Expand Down
23 changes: 20 additions & 3 deletions pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,28 @@ func (c *WatchClient) handlePodUpdate(old, new interface{}) {

func (c *WatchClient) handlePodDelete(obj interface{}) {
observability.RecordPodDeleted()
if pod, ok := obj.(*api_v1.Pod); ok {
c.forgetPod(pod)
} else {

var pod *api_v1.Pod

switch obj := obj.(type) {
case *api_v1.Pod:
pod = obj
case cache.DeletedFinalStateUnknown:
prev, ok := obj.Obj.(*api_v1.Pod)
if !ok {
c.logger.Error(
"object received was DeletedFinalStateUnknown but did not contain api_v1.Pod",
zap.Any("received", obj),
)
return
}
pod = prev
default:
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj))
return
}

c.forgetPod(pod)
}

func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)
Expand Down Expand Up @@ -1233,6 +1234,36 @@ func Test_PodsGetAddedAndDeletedFromCache(t *testing.T) {
require.NoError(t, err)
eventuallyNPodsInCache(t, 0)
})

t.Run("with deleted final state unknown", func(t *testing.T) {
pod := &api_v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
Namespace: namespace,
UID: "f15f0585-a0bc-43a3-96e4-dd2eace75392",
},
}

_, err = c.kc.CoreV1().Pods(namespace).
Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)
eventuallyNPodsInCache(t, 2)

// Rather than set up a stub Informer just for this case, bypass the
// informer + fake k8s client entirely. Manually call the delete
// handler with DeletedFinalStateUnknown.
c.handlePodDelete(cache.DeletedFinalStateUnknown{
Key: fmt.Sprintf("%s/my-pod", namespace),
Obj: pod,
})
defer func() {
err = c.kc.CoreV1().Pods(namespace).
Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
require.NoError(t, err)
}()

eventuallyNPodsInCache(t, 0)
})
}

func newTestClientWithRulesAndFilters(t *testing.T, e ExtractionRules, f Filters) (*WatchClient, *observer.ObservedLogs) {
Expand Down
71 changes: 67 additions & 4 deletions pkg/processor/k8sprocessor/kube/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,28 @@ func (op *OwnerCache) upsertNamespace(obj interface{}) {
}

func (op *OwnerCache) deleteNamespace(obj interface{}) {
namespace := obj.(*api_v1.Namespace)
var ns *api_v1.Namespace

switch obj := obj.(type) {
case *api_v1.Namespace:
ns = obj
case cache.DeletedFinalStateUnknown:
prev, ok := obj.Obj.(*api_v1.Namespace)
if !ok {
op.logger.Error(
"object received was DeletedFinalStateUnknown but did not contain api_v1.Namespace",
zap.Any("received", obj),
)
return
}
ns = prev
default:
op.logger.Error("object received was not of type api_v1.Namespace", zap.Any("received", obj))
return
}

op.nsMutex.Lock()
delete(op.namespaces, namespace.Name)
delete(op.namespaces, ns.Name)
op.nsMutex.Unlock()
}

Expand Down Expand Up @@ -326,8 +345,31 @@ func (op *OwnerCache) addOwnerInformer(
}

func (op *OwnerCache) deleteObject(obj interface{}) {
var metaObj meta_v1.Object

switch obj := obj.(type) {
case meta_v1.Object:
metaObj = obj
case cache.DeletedFinalStateUnknown:
prev, ok := obj.Obj.(meta_v1.Object)
if !ok {
op.logger.Error(
"object received was DeletedFinalStateUnknown but did not contain meta_v1.Object",
zap.Any("received", obj),
)
return
}
metaObj = prev
default:
op.logger.Error(
"object received was not of type meta_v1.Object",
zap.Any("received", obj),
)
return
}

op.ownersMutex.Lock()
delete(op.objectOwners, string(obj.(meta_v1.Object).GetUID()))
delete(op.objectOwners, string(metaObj.GetUID()))
op.ownersMutex.Unlock()
}

Expand Down Expand Up @@ -410,7 +452,28 @@ func (op *OwnerCache) deleteEndpointFromPod(pod string, endpoint string) {
}

func (op *OwnerCache) genericEndpointOp(obj interface{}, endpointFunc func(pod string, endpoint string)) {
ep := obj.(*api_v1.Endpoints)
var ep *api_v1.Endpoints

switch obj := obj.(type) {
case *api_v1.Endpoints:
ep = obj
case cache.DeletedFinalStateUnknown:
prev, ok := obj.Obj.(*api_v1.Endpoints)
if !ok {
op.logger.Error(
"object received was DeletedFinalStateUnknown but did not contain api_v1.Endpoints",
zap.Any("received", obj),
)
return
}
ep = prev
default:
op.logger.Error(
"object received was not of type api_v1.Endpoints",
zap.Any("received", obj),
)
return
}

for _, it := range ep.Subsets {
for _, addr := range it.Addresses {
Expand Down

0 comments on commit 6bc652d

Please sign in to comment.