Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(k8sprocessor): owner meta deletion grace period #1256

Merged
merged 10 commits into from
Oct 9, 2023
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Released TBD

### Fixed

- fix(k8sprocessor): delay deleting the metadata from owner resources [#1242]

[Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.86.0-sumo-0...main

## [v0.86.0-sumo-0]
Expand Down
6 changes: 1 addition & 5 deletions pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func New(
newOwnerProviderFunc = newOwnerProvider
}

c.op, err = newOwnerProviderFunc(logger, c.kc, labelSelector, fieldSelector, rules, c.Filters.Namespace)
c.op, err = newOwnerProviderFunc(logger, c.kc, labelSelector, fieldSelector, rules, c.Filters.Namespace, deleteInterval, gracePeriod)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -198,10 +198,6 @@ func (c *WatchClient) handlePodDelete(obj interface{}) {
} else {
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj))
}
c.m.RLock()
podTableSize := len(c.Pods)
c.m.RUnlock()
observability.RecordPodTableSize(int64(podTableSize))
}

func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) {
Expand Down
87 changes: 0 additions & 87 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,90 +1326,3 @@ func TestServiceInfoArrivesLate(t *testing.T) {
// Desired behavior: we get all three service names in response:
assert.Equal(t, "firstService, secondService, thirdService", serviceName)
}

//func newBenchmarkClient(b *testing.B) *WatchClient {
// e := ExtractionRules{
// ContainerID: true,
// ContainerImage: true,
// ContainerName: true,
// DaemonSetName: true,
// DeploymentName: true,
// HostName: true,
// PodUID: true,
// PodName: true,
// ReplicaSetName: true,
// ServiceName: true,
// StatefulSetName: true,
// StartTime: true,
// Namespace: true,
// NodeName: true,
// Tags: NewExtractionFieldTags(),
// }
// f := Filters{}
//
// c, _ := New(zap.NewNop(), e, f, newFakeAPIClientset, newFakeInformer, newFakeOwnerProvider, newFakeOwnerProvider)
// return c.(*WatchClient)
//}
//
//// benchmark actually checks what's the impact of adding new Pod, which is mostly impacted by duration of API call
//func benchmark(b *testing.B, podsPerUniqueOwner int) {
// c := newBenchmarkClient(b)
//
// b.ResetTimer()
// for i := 0; i < b.N; i++ {
// pod := &api_v1.Pod{
// ObjectMeta: meta_v1.ObjectMeta{
// Name: fmt.Sprintf("pod-number-%d", i),
// Namespace: "ns1",
// UID: types.UID(fmt.Sprintf("33333-%d", i)),
// CreationTimestamp: meta_v1.Now(),
// Labels: map[string]string{
// "label1": fmt.Sprintf("lv1-%d", i),
// "label2": "k1=v1 k5=v5 extra!",
// },
// Annotations: map[string]string{
// "annotation1": fmt.Sprintf("av%d", i),
// },
// OwnerReferences: []meta_v1.OwnerReference{
// {
// Kind: "ReplicaSet",
// Name: "foo-bar-rs",
// UID: types.UID(fmt.Sprintf("1a1658f9-7818-11e9-90f1-02324f7e0d1e-%d", i/podsPerUniqueOwner)),
// },
// },
// },
// Spec: api_v1.PodSpec{
// NodeName: "node1",
// Hostname: "auth-hostname3",
// Containers: []api_v1.Container{
// {
// Image: "auth-service-image",
// Name: "auth-service-container-name",
// },
// },
// },
// Status: api_v1.PodStatus{
// PodIP: fmt.Sprintf("%d.%d.%d.%d", (i>>24)%256, (i>>16)%256, (i>>8)%256, i%256),
// ContainerStatuses: []api_v1.ContainerStatus{
// {
// ContainerID: fmt.Sprintf("111-222-333-%d", i),
// },
// },
// },
// }
//
// c.handlePodAdd(pod)
// _, ok := c.GetPodByIP(pod.Status.PodIP)
// require.True(b, ok)
//
// }
//
//}
//
//func BenchmarkManyPodsPerOwner(b *testing.B) {
// benchmark(b, 100000)
//}
//
//func BenchmarkFewPodsPerOwner(b *testing.B) {
// benchmark(b, 10)
//}
6 changes: 5 additions & 1 deletion pkg/processor/k8sprocessor/kube/fake_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package kube

import (
"time"

"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -40,7 +42,9 @@ func newFakeOwnerProvider(logger *zap.Logger,
labelSelector labels.Selector,
fieldSelector fields.Selector,
extractionRules ExtractionRules,
namespace string) (OwnerAPI, error) {
namespace string,
_ time.Duration, _ time.Duration,
) (OwnerAPI, error) {
ownerCache := fakeOwnerCache{
labelSelector: labelSelector,
fieldSelector: fieldSelector,
Expand Down
89 changes: 78 additions & 11 deletions pkg/processor/k8sprocessor/kube/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kube
import (
"sort"
"sync"
"time"

"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
Expand All @@ -39,6 +40,8 @@ type OwnerProvider func(
fieldSelector fields.Selector,
extractionRules ExtractionRules,
namespace string,
deleteInterval time.Duration,
gracePeriod time.Duration,
) (OwnerAPI, error)

// ObjectOwner keeps single entry
Expand Down Expand Up @@ -70,6 +73,9 @@ type OwnerCache struct {
namespaces map[string]*api_v1.Namespace
nsMutex sync.RWMutex

deleteQueue []ownerCacheEviction
deleteMu sync.Mutex

logger *zap.Logger

stopCh chan struct{}
Expand Down Expand Up @@ -105,9 +111,13 @@ func newOwnerProvider(
labelSelector labels.Selector,
fieldSelector fields.Selector,
extractionRules ExtractionRules,
namespace string) (OwnerAPI, error) {
namespace string,
deleteInterval time.Duration,
gracePeriod time.Duration,
) (OwnerAPI, error) {

ownerCache := newOwnerCache(logger)
go ownerCache.deleteLoop(deleteInterval, gracePeriod)

factory := informers.NewSharedInformerFactoryWithOptions(client, watchSyncPeriod,
informers.WithNamespace(namespace),
Expand Down Expand Up @@ -262,10 +272,10 @@ func (op *OwnerCache) addNamespaceInformer(factory informers.SharedInformerFacto
observability.RecordOtherUpdated()
op.upsertNamespace(obj)
},
DeleteFunc: func(obj interface{}) {
DeleteFunc: op.deferredDelete(func(obj interface{}) {
observability.RecordOtherDeleted()
op.deleteNamespace(obj)
},
}),
})
if err != nil {
op.logger.Error("error adding event handler to namespace informer", zap.Error(err))
Expand All @@ -274,11 +284,26 @@ func (op *OwnerCache) addNamespaceInformer(factory informers.SharedInformerFacto
op.informers = append(op.informers, informer)
}

// deferredDelete returns a function that will handle deleting an object from
// the owner cache eventually through the owner cache deleteQueue. Takes an
// evict function that should contain the logic for processing the deletion.
func (op *OwnerCache) deferredDelete(evict func(obj any)) func(any) {
return func(obj any) {
op.deleteMu.Lock()
op.deleteQueue = append(op.deleteQueue, ownerCacheEviction{
ts: time.Now(),
evict: func() { evict(obj) },
})
op.deleteMu.Unlock()
}
}

func (op *OwnerCache) addOwnerInformer(
kind string,
informer cache.SharedIndexInformer,
cacheFunc func(kind string, obj interface{}),
deleteFunc func(obj interface{})) {
deleteFunc func(obj interface{}),
) {
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cacheFunc(kind, obj)
Expand All @@ -288,10 +313,10 @@ func (op *OwnerCache) addOwnerInformer(
cacheFunc(kind, obj)
observability.RecordOtherUpdated()
},
DeleteFunc: func(obj interface{}) {
DeleteFunc: op.deferredDelete(func(obj any) {
deleteFunc(obj)
observability.RecordOtherDeleted()
},
}),
})
if err != nil {
op.logger.Error("error adding event handler to namespace informer", zap.Error(err))
Expand Down Expand Up @@ -425,13 +450,13 @@ func (op *OwnerCache) GetNamespace(pod *api_v1.Pod) *api_v1.Namespace {

func (op *OwnerCache) GetServices(podName string) []string {
op.podServicesMutex.RLock()
defer op.podServicesMutex.RUnlock()
oo, found := op.podServices[podName]
op.podServicesMutex.RUnlock()

if found {
return oo
if !found {
return []string{}
}
return []string{}

return append([]string(nil), oo...)
}

// GetOwners goes through the cached data and assigns relevant metadata for pod
Expand Down Expand Up @@ -469,3 +494,45 @@ func (op *OwnerCache) GetOwners(pod *Pod) []*ObjectOwner {

return objectOwners
}

// deleteLoop runs along side the owner cache, checking for and deleting cache
// entries that have been marked for deletion for over the duration of the
// grace period.
func (op *OwnerCache) deleteLoop(interval time.Duration, gracePeriod time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
for _, d := range op.nextDeleteQueue(gracePeriod) {
d.evict()
}
case <-op.stopCh:
return
}
}
}

// nextDeleteQueue pops the evictions older than the gracePeriod from the
// cache's deleteQueue
func (op *OwnerCache) nextDeleteQueue(gracePeriod time.Duration) []ownerCacheEviction {
var cutoff int
now := time.Now()
op.deleteMu.Lock()
defer op.deleteMu.Unlock()
for i, d := range op.deleteQueue {
if d.ts.Add(gracePeriod).After(now) {
break
}
cutoff = i + 1
}
toDelete := op.deleteQueue[:cutoff]
op.deleteQueue = op.deleteQueue[cutoff:]
return toDelete
}

type ownerCacheEviction struct {
ts time.Time
evict func()
}
Loading