From f3d5b3b7d1a51d93665543f1de5c19a15e0ca30d Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 20 Sep 2023 16:57:15 -0700 Subject: [PATCH] fix(k8sprocessor) owner meta deletion grace period Adds a deferred deletion queue to the owner metadata cache, allowing time for telemetry data to be processed with correct metadata after the owning resource(s) have been deleted. Signed-off-by: Christian Kruse --- pkg/processor/k8sprocessor/kube/client.go | 6 +- .../k8sprocessor/kube/client_test.go | 20 +++- pkg/processor/k8sprocessor/kube/fake_owner.go | 6 +- pkg/processor/k8sprocessor/kube/owner.go | 71 +++++++++++-- pkg/processor/k8sprocessor/kube/owner_test.go | 99 +++++++++++++++++++ 5 files changed, 185 insertions(+), 17 deletions(-) diff --git a/pkg/processor/k8sprocessor/kube/client.go b/pkg/processor/k8sprocessor/kube/client.go index d5b0b65953..d125239e54 100644 --- a/pkg/processor/k8sprocessor/kube/client.go +++ b/pkg/processor/k8sprocessor/kube/client.go @@ -101,7 +101,7 @@ func New( newOwnerProviderFunc = newOwnerProvider } - c.op, err = newOwnerProviderFunc(logger, c.kc, labelSelector, fieldSelector, rules, c.Filters.Namespace) + c.op, err = newOwnerProviderFunc(logger, c.kc, labelSelector, fieldSelector, rules, c.Filters.Namespace, deleteInterval, gracePeriod) if err != nil { return nil, err } @@ -198,10 +198,6 @@ func (c *WatchClient) handlePodDelete(obj interface{}) { } else { c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj)) } - c.m.RLock() - podTableSize := len(c.Pods) - c.m.RUnlock() - observability.RecordPodTableSize(int64(podTableSize)) } func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) { diff --git a/pkg/processor/k8sprocessor/kube/client_test.go b/pkg/processor/k8sprocessor/kube/client_test.go index 0facf5c396..45c747834a 100644 --- a/pkg/processor/k8sprocessor/kube/client_test.go +++ b/pkg/processor/k8sprocessor/kube/client_test.go @@ -1335,7 +1335,7 @@ func TestServiceInfoArrivesLate(t *testing.T) { // DaemonSetName: true, // DeploymentName: true, // HostName: true, -// PodUID: true, +// PodUID: true, // PodName: true, // ReplicaSetName: true, // ServiceName: true, @@ -1345,9 +1345,21 @@ func TestServiceInfoArrivesLate(t *testing.T) { // NodeName: true, // Tags: NewExtractionFieldTags(), // } -// f := Filters{} // -// c, _ := New(zap.NewNop(), e, f, newFakeAPIClientset, newFakeInformer, newFakeOwnerProvider, newFakeOwnerProvider) +// c, _ := New( +// zap.NewNop(), +// k8sconfig.APIConfig{}, +// e, +// Filters{}, +// []Association{}, +// Excludes{}, +// newFakeAPIClientset, +// NewFakeInformer, +// newFakeOwnerProvider, +// "", +// 30*time.Second, +// DefaultPodDeleteGracePeriod, +// ) // return c.(*WatchClient) //} // @@ -1399,7 +1411,7 @@ func TestServiceInfoArrivesLate(t *testing.T) { // } // // c.handlePodAdd(pod) -// _, ok := c.GetPodByIP(pod.Status.PodIP) +// _, ok := c.getPod(PodIdentifier(pod.Status.PodIP)) // require.True(b, ok) // // } diff --git a/pkg/processor/k8sprocessor/kube/fake_owner.go b/pkg/processor/k8sprocessor/kube/fake_owner.go index e91fa49205..77dec8bf75 100644 --- a/pkg/processor/k8sprocessor/kube/fake_owner.go +++ b/pkg/processor/k8sprocessor/kube/fake_owner.go @@ -15,6 +15,8 @@ package kube import ( + "time" + "go.uber.org/zap" api_v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -40,7 +42,9 @@ func newFakeOwnerProvider(logger *zap.Logger, labelSelector labels.Selector, fieldSelector fields.Selector, extractionRules ExtractionRules, - namespace string) (OwnerAPI, error) { + namespace string, + _ time.Duration, _ time.Duration, +) (OwnerAPI, error) { ownerCache := fakeOwnerCache{ labelSelector: labelSelector, fieldSelector: fieldSelector, diff --git a/pkg/processor/k8sprocessor/kube/owner.go b/pkg/processor/k8sprocessor/kube/owner.go index 7b4f755bbf..ae6126b454 100644 --- a/pkg/processor/k8sprocessor/kube/owner.go +++ b/pkg/processor/k8sprocessor/kube/owner.go @@ -17,6 +17,7 @@ package kube import ( "sort" "sync" + "time" "go.uber.org/zap" api_v1 "k8s.io/api/core/v1" @@ -39,6 +40,8 @@ type OwnerProvider func( fieldSelector fields.Selector, extractionRules ExtractionRules, namespace string, + deleteInterval time.Duration, + gracePeriod time.Duration, ) (OwnerAPI, error) // ObjectOwner keeps single entry @@ -70,13 +73,19 @@ type OwnerCache struct { namespaces map[string]*api_v1.Namespace nsMutex sync.RWMutex + deleteQueue []ownerCacheEviction + deleteMu sync.Mutex + logger *zap.Logger stopCh chan struct{} informers []cache.SharedIndexInformer } -func newOwnerCache(logger *zap.Logger) OwnerCache { +func newOwnerCache(logger *zap.Logger, + deleteInterval time.Duration, + gracePeriod time.Duration, +) OwnerCache { return OwnerCache{ objectOwners: map[string]*ObjectOwner{}, podServices: map[string][]string{}, @@ -105,9 +114,13 @@ func newOwnerProvider( labelSelector labels.Selector, fieldSelector fields.Selector, extractionRules ExtractionRules, - namespace string) (OwnerAPI, error) { + namespace string, + deleteInterval time.Duration, + gracePeriod time.Duration, +) (OwnerAPI, error) { - ownerCache := newOwnerCache(logger) + ownerCache := newOwnerCache(logger, deleteInterval, gracePeriod) + go ownerCache.deleteLoop(deleteInterval, gracePeriod) factory := informers.NewSharedInformerFactoryWithOptions(client, watchSyncPeriod, informers.WithNamespace(namespace), @@ -262,10 +275,10 @@ func (op *OwnerCache) addNamespaceInformer(factory informers.SharedInformerFacto observability.RecordOtherUpdated() op.upsertNamespace(obj) }, - DeleteFunc: func(obj interface{}) { + DeleteFunc: op.deferredDelete(func(obj interface{}) { observability.RecordOtherDeleted() op.deleteNamespace(obj) - }, + }), }) if err != nil { op.logger.Error("error adding event handler to namespace informer", zap.Error(err)) @@ -274,6 +287,16 @@ func (op *OwnerCache) addNamespaceInformer(factory informers.SharedInformerFacto op.informers = append(op.informers, informer) } +func (op *OwnerCache) deferredDelete(evict func(obj any)) func(any) { + return func(obj any) { + op.deleteMu.Lock() + op.deleteQueue = append(op.deleteQueue, ownerCacheEviction{ + ts: time.Now(), + evict: func() { evict(obj) }, + }) + op.deleteMu.Unlock() + } +} func (op *OwnerCache) addOwnerInformer( kind string, informer cache.SharedIndexInformer, @@ -288,10 +311,10 @@ func (op *OwnerCache) addOwnerInformer( cacheFunc(kind, obj) observability.RecordOtherUpdated() }, - DeleteFunc: func(obj interface{}) { + DeleteFunc: op.deferredDelete(func(obj any) { deleteFunc(obj) observability.RecordOtherDeleted() - }, + }), }) if err != nil { op.logger.Error("error adding event handler to namespace informer", zap.Error(err)) @@ -469,3 +492,37 @@ func (op *OwnerCache) GetOwners(pod *Pod) []*ObjectOwner { return objectOwners } + +func (op *OwnerCache) deleteLoop(interval time.Duration, gracePeriod time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + var cutoff int + now := time.Now() + op.deleteMu.Lock() + for i, d := range op.deleteQueue { + if d.ts.Add(gracePeriod).After(now) { + break + } + cutoff = i + 1 + } + toDelete := op.deleteQueue[:cutoff] + op.deleteQueue = op.deleteQueue[cutoff:] + op.deleteMu.Unlock() + + for _, d := range toDelete { + d.evict() + } + case <-op.stopCh: + return + } + } +} + +type ownerCacheEviction struct { + ts time.Time + evict func() +} diff --git a/pkg/processor/k8sprocessor/kube/owner_test.go b/pkg/processor/k8sprocessor/kube/owner_test.go index 579f56d28b..11052fb768 100644 --- a/pkg/processor/k8sprocessor/kube/owner_test.go +++ b/pkg/processor/k8sprocessor/kube/owner_test.go @@ -70,6 +70,8 @@ func Test_OwnerProvider_GetOwners_ReplicaSet(t *testing.T) { Tags: NewExtractionFieldTags(), }, "kube-system", + // relatively short delete interval and grace periods for expediencey + time.Millisecond*10, time.Millisecond*500, ) require.NoError(t, err) @@ -133,6 +135,15 @@ func Test_OwnerProvider_GetOwners_ReplicaSet(t *testing.T) { return true }, 5*time.Second, 5*time.Millisecond) + + err = c.AppsV1().ReplicaSets("kube-system").Delete( + context.Background(), "my-rs", metav1.DeleteOptions{}) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + owners := op.GetOwners(&Pod{OwnerReferences: &pod.OwnerReferences}) + return len(owners) == 0 + }, 5*time.Second, 5*time.Millisecond) } func Test_OwnerProvider_GetOwners_Deployment(t *testing.T) { @@ -156,6 +167,7 @@ func Test_OwnerProvider_GetOwners_Deployment(t *testing.T) { Tags: NewExtractionFieldTags(), }, "kube-system", + time.Second*30, DefaultPodDeleteGracePeriod, ) require.NoError(t, err) @@ -268,6 +280,7 @@ func Test_OwnerProvider_GetOwners_Statefulset(t *testing.T) { Tags: NewExtractionFieldTags(), }, "kube-system", + time.Second*30, DefaultPodDeleteGracePeriod, ) require.NoError(t, err) @@ -354,6 +367,7 @@ func Test_OwnerProvider_GetOwners_Daemonset(t *testing.T) { Tags: NewExtractionFieldTags(), }, "kube-system", + time.Second*30, DefaultPodDeleteGracePeriod, ) require.NoError(t, err) @@ -444,6 +458,7 @@ func Test_OwnerProvider_GetServices(t *testing.T) { Tags: NewExtractionFieldTags(), }, namespace, + time.Millisecond*10, time.Millisecond*500, ) require.NoError(t, err) @@ -590,6 +605,7 @@ func Test_OwnerProvider_GetOwners_Job(t *testing.T) { Tags: NewExtractionFieldTags(), }, "kube-system", + time.Second*30, DefaultPodDeleteGracePeriod, ) require.NoError(t, err) @@ -676,6 +692,7 @@ func Test_OwnerProvider_GetOwners_CronJob(t *testing.T) { Tags: NewExtractionFieldTags(), }, "kube-system", + time.Second*30, DefaultPodDeleteGracePeriod, ) require.NoError(t, err) @@ -766,3 +783,85 @@ func Test_OwnerProvider_GetOwners_CronJob(t *testing.T) { return true }, 5*time.Second, 5*time.Millisecond) } + +func Test_OwnerProvider_GetNamespace(t *testing.T) { + c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) + require.NoError(t, err) + + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + op, err := newOwnerProvider( + logger, + c, + labels.Everything(), + fields.Everything(), + ExtractionRules{ + Namespace: true, + OwnerLookupEnabled: true, + PodUID: true, + PodName: true, + ReplicaSetName: true, + Tags: NewExtractionFieldTags(), + }, + "kube-system", + // relatively short delete interval and grace periods for expediencey + time.Millisecond*10, time.Millisecond*500, + ) + require.NoError(t, err) + + client := c.(*fake.Clientset) + replicaSetWatchEstablished := waitForWatchToBeEstablished(client, "namespaces") + + op.Start() + t.Cleanup(func() { + op.Stop() + }) + + <-replicaSetWatchEstablished + + nsUID := types.UID("fb9e6935-8936-4959-bd90-4e975a4c2b07") + _, err = c.CoreV1().Namespaces().Create(context.Background(), + &api_v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testns", + UID: nsUID, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + pod := &api_v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: "testns", + UID: "e98a3d3e-fde9-4b10-8f61-cc37d0357c28", + }, + } + + _, err = c.CoreV1().Pods("testns"). + Create(context.Background(), pod, metav1.CreateOptions{}) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + ns := op.GetNamespace(pod) + if ns == nil { + return false + } + + if uid := ns.UID; uid != nsUID { + t.Logf("wrong namespace UID: %v", uid) + return false + } + + return true + }, 5*time.Second, 5*time.Millisecond) + + err = c.CoreV1().Namespaces().Delete( + context.Background(), "testns", metav1.DeleteOptions{}) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + ns := op.GetNamespace(pod) + return ns == nil + }, 5*time.Second, 5*time.Millisecond) +}