Skip to content

Commit

Permalink
fix(k8sprocessor) owner meta deletion grace period
Browse files Browse the repository at this point in the history
Adds a deferred deletion queue to the owner metadata cache, allowing
time for telemetry data to be processed with correct metadata after the
owning resource(s) have been deleted.

Signed-off-by: Christian Kruse <[email protected]>
  • Loading branch information
c-kruse committed Sep 21, 2023
1 parent 50dfb0e commit f3d5b3b
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 17 deletions.
6 changes: 1 addition & 5 deletions pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func New(
newOwnerProviderFunc = newOwnerProvider
}

c.op, err = newOwnerProviderFunc(logger, c.kc, labelSelector, fieldSelector, rules, c.Filters.Namespace)
c.op, err = newOwnerProviderFunc(logger, c.kc, labelSelector, fieldSelector, rules, c.Filters.Namespace, deleteInterval, gracePeriod)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -198,10 +198,6 @@ func (c *WatchClient) handlePodDelete(obj interface{}) {
} else {
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj))
}
c.m.RLock()
podTableSize := len(c.Pods)
c.m.RUnlock()
observability.RecordPodTableSize(int64(podTableSize))
}

func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) {
Expand Down
20 changes: 16 additions & 4 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,7 @@ func TestServiceInfoArrivesLate(t *testing.T) {
// DaemonSetName: true,
// DeploymentName: true,
// HostName: true,
// PodUID: true,
// PodUID: true,
// PodName: true,
// ReplicaSetName: true,
// ServiceName: true,
Expand All @@ -1345,9 +1345,21 @@ func TestServiceInfoArrivesLate(t *testing.T) {
// NodeName: true,
// Tags: NewExtractionFieldTags(),
// }
// f := Filters{}
//
// c, _ := New(zap.NewNop(), e, f, newFakeAPIClientset, newFakeInformer, newFakeOwnerProvider, newFakeOwnerProvider)
// c, _ := New(
// zap.NewNop(),
// k8sconfig.APIConfig{},
// e,
// Filters{},
// []Association{},
// Excludes{},
// newFakeAPIClientset,
// NewFakeInformer,
// newFakeOwnerProvider,
// "",
// 30*time.Second,
// DefaultPodDeleteGracePeriod,
// )
// return c.(*WatchClient)
//}
//
Expand Down Expand Up @@ -1399,7 +1411,7 @@ func TestServiceInfoArrivesLate(t *testing.T) {
// }
//
// c.handlePodAdd(pod)
// _, ok := c.GetPodByIP(pod.Status.PodIP)
// _, ok := c.getPod(PodIdentifier(pod.Status.PodIP))
// require.True(b, ok)
//
// }
Expand Down
6 changes: 5 additions & 1 deletion pkg/processor/k8sprocessor/kube/fake_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package kube

import (
"time"

"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -40,7 +42,9 @@ func newFakeOwnerProvider(logger *zap.Logger,
labelSelector labels.Selector,
fieldSelector fields.Selector,
extractionRules ExtractionRules,
namespace string) (OwnerAPI, error) {
namespace string,
_ time.Duration, _ time.Duration,
) (OwnerAPI, error) {
ownerCache := fakeOwnerCache{
labelSelector: labelSelector,
fieldSelector: fieldSelector,
Expand Down
71 changes: 64 additions & 7 deletions pkg/processor/k8sprocessor/kube/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kube
import (
"sort"
"sync"
"time"

"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
Expand All @@ -39,6 +40,8 @@ type OwnerProvider func(
fieldSelector fields.Selector,
extractionRules ExtractionRules,
namespace string,
deleteInterval time.Duration,
gracePeriod time.Duration,
) (OwnerAPI, error)

// ObjectOwner keeps single entry
Expand Down Expand Up @@ -70,13 +73,19 @@ type OwnerCache struct {
namespaces map[string]*api_v1.Namespace
nsMutex sync.RWMutex

deleteQueue []ownerCacheEviction
deleteMu sync.Mutex

logger *zap.Logger

stopCh chan struct{}
informers []cache.SharedIndexInformer
}

func newOwnerCache(logger *zap.Logger) OwnerCache {
func newOwnerCache(logger *zap.Logger,
deleteInterval time.Duration,
gracePeriod time.Duration,
) OwnerCache {
return OwnerCache{
objectOwners: map[string]*ObjectOwner{},
podServices: map[string][]string{},
Expand Down Expand Up @@ -105,9 +114,13 @@ func newOwnerProvider(
labelSelector labels.Selector,
fieldSelector fields.Selector,
extractionRules ExtractionRules,
namespace string) (OwnerAPI, error) {
namespace string,
deleteInterval time.Duration,
gracePeriod time.Duration,
) (OwnerAPI, error) {

ownerCache := newOwnerCache(logger)
ownerCache := newOwnerCache(logger, deleteInterval, gracePeriod)
go ownerCache.deleteLoop(deleteInterval, gracePeriod)

factory := informers.NewSharedInformerFactoryWithOptions(client, watchSyncPeriod,
informers.WithNamespace(namespace),
Expand Down Expand Up @@ -262,10 +275,10 @@ func (op *OwnerCache) addNamespaceInformer(factory informers.SharedInformerFacto
observability.RecordOtherUpdated()
op.upsertNamespace(obj)
},
DeleteFunc: func(obj interface{}) {
DeleteFunc: op.deferredDelete(func(obj interface{}) {
observability.RecordOtherDeleted()
op.deleteNamespace(obj)
},
}),
})
if err != nil {
op.logger.Error("error adding event handler to namespace informer", zap.Error(err))
Expand All @@ -274,6 +287,16 @@ func (op *OwnerCache) addNamespaceInformer(factory informers.SharedInformerFacto
op.informers = append(op.informers, informer)
}

func (op *OwnerCache) deferredDelete(evict func(obj any)) func(any) {
return func(obj any) {
op.deleteMu.Lock()
op.deleteQueue = append(op.deleteQueue, ownerCacheEviction{
ts: time.Now(),
evict: func() { evict(obj) },
})
op.deleteMu.Unlock()
}
}
func (op *OwnerCache) addOwnerInformer(
kind string,
informer cache.SharedIndexInformer,
Expand All @@ -288,10 +311,10 @@ func (op *OwnerCache) addOwnerInformer(
cacheFunc(kind, obj)
observability.RecordOtherUpdated()
},
DeleteFunc: func(obj interface{}) {
DeleteFunc: op.deferredDelete(func(obj any) {
deleteFunc(obj)
observability.RecordOtherDeleted()
},
}),
})
if err != nil {
op.logger.Error("error adding event handler to namespace informer", zap.Error(err))
Expand Down Expand Up @@ -469,3 +492,37 @@ func (op *OwnerCache) GetOwners(pod *Pod) []*ObjectOwner {

return objectOwners
}

func (op *OwnerCache) deleteLoop(interval time.Duration, gracePeriod time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
var cutoff int
now := time.Now()
op.deleteMu.Lock()
for i, d := range op.deleteQueue {
if d.ts.Add(gracePeriod).After(now) {
break
}
cutoff = i + 1
}
toDelete := op.deleteQueue[:cutoff]
op.deleteQueue = op.deleteQueue[cutoff:]
op.deleteMu.Unlock()

for _, d := range toDelete {
d.evict()
}
case <-op.stopCh:
return
}
}
}

type ownerCacheEviction struct {
ts time.Time
evict func()
}
99 changes: 99 additions & 0 deletions pkg/processor/k8sprocessor/kube/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func Test_OwnerProvider_GetOwners_ReplicaSet(t *testing.T) {
Tags: NewExtractionFieldTags(),
},
"kube-system",
// relatively short delete interval and grace periods for expediencey
time.Millisecond*10, time.Millisecond*500,
)
require.NoError(t, err)

Expand Down Expand Up @@ -133,6 +135,15 @@ func Test_OwnerProvider_GetOwners_ReplicaSet(t *testing.T) {

return true
}, 5*time.Second, 5*time.Millisecond)

err = c.AppsV1().ReplicaSets("kube-system").Delete(
context.Background(), "my-rs", metav1.DeleteOptions{})
require.NoError(t, err)

assert.Eventually(t, func() bool {
owners := op.GetOwners(&Pod{OwnerReferences: &pod.OwnerReferences})
return len(owners) == 0
}, 5*time.Second, 5*time.Millisecond)
}

func Test_OwnerProvider_GetOwners_Deployment(t *testing.T) {
Expand All @@ -156,6 +167,7 @@ func Test_OwnerProvider_GetOwners_Deployment(t *testing.T) {
Tags: NewExtractionFieldTags(),
},
"kube-system",
time.Second*30, DefaultPodDeleteGracePeriod,
)
require.NoError(t, err)

Expand Down Expand Up @@ -268,6 +280,7 @@ func Test_OwnerProvider_GetOwners_Statefulset(t *testing.T) {
Tags: NewExtractionFieldTags(),
},
"kube-system",
time.Second*30, DefaultPodDeleteGracePeriod,
)
require.NoError(t, err)

Expand Down Expand Up @@ -354,6 +367,7 @@ func Test_OwnerProvider_GetOwners_Daemonset(t *testing.T) {
Tags: NewExtractionFieldTags(),
},
"kube-system",
time.Second*30, DefaultPodDeleteGracePeriod,
)
require.NoError(t, err)

Expand Down Expand Up @@ -444,6 +458,7 @@ func Test_OwnerProvider_GetServices(t *testing.T) {
Tags: NewExtractionFieldTags(),
},
namespace,
time.Millisecond*10, time.Millisecond*500,
)
require.NoError(t, err)

Expand Down Expand Up @@ -590,6 +605,7 @@ func Test_OwnerProvider_GetOwners_Job(t *testing.T) {
Tags: NewExtractionFieldTags(),
},
"kube-system",
time.Second*30, DefaultPodDeleteGracePeriod,
)
require.NoError(t, err)

Expand Down Expand Up @@ -676,6 +692,7 @@ func Test_OwnerProvider_GetOwners_CronJob(t *testing.T) {
Tags: NewExtractionFieldTags(),
},
"kube-system",
time.Second*30, DefaultPodDeleteGracePeriod,
)
require.NoError(t, err)

Expand Down Expand Up @@ -766,3 +783,85 @@ func Test_OwnerProvider_GetOwners_CronJob(t *testing.T) {
return true
}, 5*time.Second, 5*time.Millisecond)
}

func Test_OwnerProvider_GetNamespace(t *testing.T) {
c, err := newFakeAPIClientset(k8sconfig.APIConfig{})
require.NoError(t, err)

logger, err := zap.NewDevelopment()
require.NoError(t, err)

op, err := newOwnerProvider(
logger,
c,
labels.Everything(),
fields.Everything(),
ExtractionRules{
Namespace: true,
OwnerLookupEnabled: true,
PodUID: true,
PodName: true,
ReplicaSetName: true,
Tags: NewExtractionFieldTags(),
},
"kube-system",
// relatively short delete interval and grace periods for expediencey
time.Millisecond*10, time.Millisecond*500,
)
require.NoError(t, err)

client := c.(*fake.Clientset)
replicaSetWatchEstablished := waitForWatchToBeEstablished(client, "namespaces")

op.Start()
t.Cleanup(func() {
op.Stop()
})

<-replicaSetWatchEstablished

nsUID := types.UID("fb9e6935-8936-4959-bd90-4e975a4c2b07")
_, err = c.CoreV1().Namespaces().Create(context.Background(),
&api_v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "testns",
UID: nsUID,
},
}, metav1.CreateOptions{})
require.NoError(t, err)

pod := &api_v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
Namespace: "testns",
UID: "e98a3d3e-fde9-4b10-8f61-cc37d0357c28",
},
}

_, err = c.CoreV1().Pods("testns").
Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)

assert.Eventually(t, func() bool {
ns := op.GetNamespace(pod)
if ns == nil {
return false
}

if uid := ns.UID; uid != nsUID {
t.Logf("wrong namespace UID: %v", uid)
return false
}

return true
}, 5*time.Second, 5*time.Millisecond)

err = c.CoreV1().Namespaces().Delete(
context.Background(), "testns", metav1.DeleteOptions{})
require.NoError(t, err)

assert.Eventually(t, func() bool {
ns := op.GetNamespace(pod)
return ns == nil
}, 5*time.Second, 5*time.Millisecond)
}

0 comments on commit f3d5b3b

Please sign in to comment.