From 0d74a4ec7975abe28dd9c27ac34da92ae9a5a270 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 27 Nov 2024 04:51:54 +0545 Subject: [PATCH] feat: single job to consume watch events & resources * fix: config_changes_config_id_fkey * feat: ignored configs cache * feat: priority queue --- api/v1/interface.go | 4 +- db/update.go | 6 +- scrapers/cron.go | 245 ++++++++++++++-------------- scrapers/kubernetes/events_watch.go | 58 +++++-- scrapers/kubernetes/informers.go | 8 +- scrapers/kubernetes/kubernetes.go | 8 + 6 files changed, 183 insertions(+), 146 deletions(-) diff --git a/api/v1/interface.go b/api/v1/interface.go index c86524da..700b92b1 100644 --- a/api/v1/interface.go +++ b/api/v1/interface.go @@ -220,7 +220,7 @@ func (t *ScrapeSummary) AddChangeSummary(configType string, cs ChangeSummary) { v.Change = &ChangeSummary{ Ignored: cs.Ignored, Orphaned: cs.Orphaned, - ForeginKeyErrors: cs.ForeginKeyErrors, + ForeignKeyErrors: cs.ForeignKeyErrors, } (*t)[configType] = v } @@ -252,7 +252,7 @@ func (t *ScrapeSummary) AddWarning(configType, warning string) { type ChangeSummary struct { Orphaned map[string]int `json:"orphaned,omitempty"` Ignored map[string]int `json:"ignored,omitempty"` - ForeginKeyErrors int `json:"foreign_key_errors,omitempty"` + ForeignKeyErrors int `json:"foreign_key_errors,omitempty"` } func (t ChangeSummary) IsEmpty() bool { diff --git a/db/update.go b/db/update.go index 596674df..90ebdaba 100644 --- a/db/update.go +++ b/db/update.go @@ -604,9 +604,11 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum for _, c := range newChanges { if err := ctx.DB().Create(&c).Error; err != nil { if !dutydb.IsForeignKeyError(err) { - return summary, fmt.Errorf("failed to create config changes: %w", dutydb.ErrorDetails(err)) + return summary, fmt.Errorf("failed to create config change: %w", dutydb.ErrorDetails(err)) } - summary.AddChangeSummary(c.ConfigType, v1.ChangeSummary{ForeginKeyErrors: 1}) + + ctx.Errorf("failed to save config change: (config:%s, details:%v changeType:%s, externalChangeID:%s)", c.ConfigID, c.Details, c.ChangeType, lo.FromPtr(c.ExternalChangeID)) + summary.AddChangeSummary(c.ConfigType, v1.ChangeSummary{ForeignKeyErrors: 1}) } } } diff --git a/scrapers/cron.go b/scrapers/cron.go index eb6b4218..da86c896 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -8,6 +8,7 @@ import ( "sync" "time" + pq "github.com/emirpasic/gods/queues/priorityqueue" "github.com/flanksource/commons/collections" "github.com/flanksource/commons/logger" "github.com/flanksource/duty/context" @@ -124,7 +125,6 @@ func SyncScrapeConfigs(sc context.Context) { var existing []string for _, m := range scraperConfigsDB { existing = append(existing, m.ID.String()) - existing = append(existing, consumeKubernetesWatchResourcesJobKey(m.ID.String())) existing = append(existing, consumeKubernetesWatchEventsJobKey(m.ID.String())) } @@ -252,6 +252,10 @@ func newScraperJob(sc api.ScrapeContext) *job.Job { func scheduleScraperJob(sc api.ScrapeContext) error { j := newScraperJob(sc) + if sc.PropertyOn(false, "disable") { + return nil + } + scrapeJobs.Store(sc.ScraperID(), j) if err := j.AddToScheduler(scrapeJobScheduler); err != nil { return fmt.Errorf("[%s] failed to schedule %v", j.Name, err) @@ -272,17 +276,11 @@ func scheduleScraperJob(sc api.ScrapeContext) error { return fmt.Errorf("failed to watch kubernetes resources: %v", err) } - eventsWatchJob := ConsumeKubernetesWatchEventsJobFunc(sc, config) - if err := eventsWatchJob.AddToScheduler(scrapeJobScheduler); err != nil { - return fmt.Errorf("failed to schedule kubernetes watch event consumer job: %v", err) + watchConsumerJob := ConsumeKubernetesWatchJobFunc(sc, config) + if err := watchConsumerJob.AddToScheduler(scrapeJobScheduler); err != nil { + return fmt.Errorf("failed to schedule kubernetes watch consumer job: %v", err) } - scrapeJobs.Store(consumeKubernetesWatchEventsJobKey(sc.ScraperID()), eventsWatchJob) - - resourcesWatchJob := ConsumeKubernetesWatchResourcesJobFunc(sc, config) - if err := resourcesWatchJob.AddToScheduler(scrapeJobScheduler); err != nil { - return fmt.Errorf("failed to schedule kubernetes watch resources consumer job: %v", err) - } - scrapeJobs.Store(consumeKubernetesWatchResourcesJobKey(sc.ScraperID()), resourcesWatchJob) + scrapeJobs.Store(consumeKubernetesWatchEventsJobKey(sc.ScraperID()), watchConsumerJob) } return nil @@ -292,12 +290,12 @@ func consumeKubernetesWatchEventsJobKey(id string) string { return id + "-consume-kubernetes-watch-events" } -// ConsumeKubernetesWatchEventsJobFunc returns a job that consumes kubernetes watch events +// ConsumeKubernetesWatchJobFunc returns a job that consumes kubernetes watch events // for the given config of the scrapeconfig. -func ConsumeKubernetesWatchEventsJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job { +func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job { scrapeConfig := *sc.ScrapeConfig() return &job.Job{ - Name: "ConsumeKubernetesWatchEvents", + Name: "ConsumeKubernetesWatch", Context: sc.DutyContext().WithObject(sc.ScrapeConfig().ObjectMeta), JobHistory: true, Singleton: true, @@ -307,140 +305,145 @@ func ConsumeKubernetesWatchEventsJobFunc(sc api.ScrapeContext, config v1.Kuberne ID: fmt.Sprintf("%s/%s", sc.ScrapeConfig().Namespace, sc.ScrapeConfig().Name), ResourceType: job.ResourceTypeScraper, Fn: func(ctx job.JobRuntime) error { - _ch, ok := kubernetes.WatchEventBuffers.Load(config.Hash()) - if !ok { - return fmt.Errorf("no watcher found for config (scrapeconfig: %s) %s", scrapeConfig.GetUID(), config.Hash()) + var queue *pq.Queue + if q, ok := kubernetes.WatchQueue.Load(config.Hash()); !ok { + return fmt.Errorf("no watch queue found for config (scrapeconfig: %s) %s", scrapeConfig.GetUID(), config.Hash()) + } else { + queue = q.(*pq.Queue) } - ch := _ch.(chan v1.KubernetesEvent) - events, _, _, _ := lo.Buffer(ch, len(ch)) + var events []v1.KubernetesEvent + var objs []*unstructured.Unstructured + var count int + for { + val, more := queue.Dequeue() + if !more { + break + } - cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape() - cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("%s/%s", ctx.GetNamespace(), ctx.GetName())) - results, err := RunK8IncrementalScraper(cc, config, events) - if err != nil { - return err - } + // On the off chance the queue is populated faster than it's consumed + // and to keep each run short, we set a limit. + if count > kubernetes.BufferSize { + break + } - if summary, err := db.SaveResults(cc, results); err != nil { - return fmt.Errorf("failed to save results: %w", err) - } else { - ctx.History.AddDetails("scrape_summary", summary) + switch v := val.(type) { + case v1.KubernetesEvent: + events = append(events, v) + case *unstructured.Unstructured: + objs = append(objs, v) + default: + return fmt.Errorf("unexpected data in the queue: %T", v) + } } - for i := range results { - if results[i].Error != nil { - ctx.History.AddError(results[i].Error.Error()) - } else { - ctx.History.SuccessCount++ - } + // NOTE: The resource watcher can return multiple objects for the same NEW resource. + // Example: if a new pod is created, we'll get that pod object multiple times for different events. + // All those resource objects are seen as distinct new config items. + // Hence, we need to use the latest one otherwise saving fails + // as we'll be trying to BATCH INSERT multiple config items with the same id. + // + // In the process, we will lose diff changes though. + // If diff changes are necessary, then we can split up the results in such + // a way that no two objects in a batch have the same id. + objs = dedup(objs) + if err := consumeResources(ctx, scrapeConfig, config, objs); err != nil { + ctx.History.AddErrorf("failed to consume resources: %v", err) + return err } - return nil + return consumeWatchEvents(ctx, scrapeConfig, config, events) }, } } -func consumeKubernetesWatchResourcesJobKey(id string) string { - return id + "-consume-kubernetes-watch-resources" -} +func consumeWatchEvents(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, events []v1.KubernetesEvent) error { + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape() + cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("%s/%s", ctx.GetNamespace(), ctx.GetName())) + results, err := RunK8IncrementalScraper(cc, config, events) + if err != nil { + return err + } -func dedup(objs []*unstructured.Unstructured) []*unstructured.Unstructured { - var output []*unstructured.Unstructured - seen := make(map[types.UID]struct{}) + if summary, err := db.SaveResults(cc, results); err != nil { + return fmt.Errorf("failed to save results: %w", err) + } else { + ctx.History.AddDetails("scrape_summary", summary) + } - // Iterate in reverse, cuz we want the latest - for i := len(objs) - 1; i >= 0; i-- { - if _, ok := seen[objs[i].GetUID()]; ok { - continue + for i := range results { + if results[i].Error != nil { + ctx.History.AddError(results[i].Error.Error()) + } else { + ctx.History.SuccessCount++ } - - seen[objs[i].GetUID()] = struct{}{} - output = append(output, objs[i]) } - return output + return nil } -// ConsumeKubernetesWatchEventsJobFunc returns a job that consumes kubernetes watch events -// for the given config of the scrapeconfig. -func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job { - scrapeConfig := *sc.ScrapeConfig() - return &job.Job{ - Name: "ConsumeKubernetesWatchResources", - Context: sc.DutyContext().WithObject(sc.ScrapeConfig().ObjectMeta), - JobHistory: true, - Singleton: true, - Retention: job.RetentionFew, - Schedule: "@every 15s", - ResourceID: string(scrapeConfig.GetUID()), - ID: fmt.Sprintf("%s/%s", sc.ScrapeConfig().Namespace, sc.ScrapeConfig().Name), - ResourceType: job.ResourceTypeScraper, - Fn: func(ctx job.JobRuntime) error { - _ch, ok := kubernetes.WatchResourceBuffer.Load(config.Hash()) - if !ok { - return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) - } - ch := _ch.(chan *unstructured.Unstructured) - objs, _, _, _ := lo.Buffer(ch, len(ch)) +func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs []*unstructured.Unstructured) error { + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape() + cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName())) + results, err := RunK8sObjectsScraper(cc, config, objs) + if err != nil { + return err + } - // NOTE: The resource watcher can return multiple objects for the same NEW resource. - // Example: if a new pod is created, we'll get that pod object multiple times for different events. - // All those resource objects are seen as distinct new config items. - // Hence, we need to use the latest one otherwise saving fails - // as we'll be trying to BATCH INSERT multiple config items with the same id. - // - // In the process, we will lose diff changes though. - // If diff changes are necessary, then we can split up the results in such - // a way that no two objects in a batch have the same id. - objs = dedup(objs) + if summary, err := db.SaveResults(cc, results); err != nil { + return fmt.Errorf("failed to save %d results: %w", len(results), err) + } else { + ctx.History.AddDetails("scrape_summary", summary) + } - cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape() - cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName())) - results, err := RunK8sObjectsScraper(cc, config, objs) - if err != nil { - return err - } + for i := range results { + if results[i].Error != nil { + ctx.History.AddError(results[i].Error.Error()) + } else { + ctx.History.SuccessCount++ + } + } - if summary, err := db.SaveResults(cc, results); err != nil { - return fmt.Errorf("failed to save %d results: %w", len(results), err) - } else { - ctx.History.AddDetails("scrape_summary", summary) - } + _deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash()) + if !ok { + return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) + } + deleteChan := _deleteCh.(chan string) - for i := range results { - if results[i].Error != nil { - ctx.History.AddError(results[i].Error.Error()) - } else { - ctx.History.SuccessCount++ - } - } + if len(deleteChan) > 0 { + deletedResourcesIDs, _, _, _ := lo.Buffer(deleteChan, len(deleteChan)) - _deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash()) - if !ok { - return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) + total, err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...) + if err != nil { + return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err) + } else if total != len(deletedResourcesIDs) { + ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourcesIDs)) + if cc.PropertyOn(false, "log.missing") { + ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResourcesIDs), total) } - deleteChan := _deleteCh.(chan string) + } + + ctx.History.SuccessCount += total + } - if len(deleteChan) > 0 { - deletedResourcesIDs, _, _, _ := lo.Buffer(deleteChan, len(deleteChan)) + return nil +} - total, err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...) - if err != nil { - return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err) - } else if total != len(deletedResourcesIDs) { - ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourcesIDs)) - if sc.PropertyOn(false, "log.missing") { - ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResourcesIDs), total) - } - } +func dedup(objs []*unstructured.Unstructured) []*unstructured.Unstructured { + var output []*unstructured.Unstructured + seen := make(map[types.UID]struct{}) - ctx.History.SuccessCount += total - } + // Iterate in reverse, cuz we want the latest + for i := len(objs) - 1; i >= 0; i-- { + if _, ok := seen[objs[i].GetUID()]; ok { + continue + } - return nil - }, + seen[objs[i].GetUID()] = struct{}{} + output = append(output, objs[i]) } + + return output } func DeleteScrapeJob(id string) { @@ -457,10 +460,4 @@ func DeleteScrapeJob(id string) { existingJob.Unschedule() scrapeJobs.Delete(id) } - - if j, ok := scrapeJobs.Load(consumeKubernetesWatchResourcesJobKey(id)); ok { - existingJob := j.(*job.Job) - existingJob.Unschedule() - scrapeJobs.Delete(id) - } } diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 4f7f4d79..e1b72bf9 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -4,12 +4,14 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/watch" + pq "github.com/emirpasic/gods/queues/priorityqueue" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" ) @@ -18,13 +20,8 @@ var ( // BufferSize is the size of the channel that buffers kubernetes watch events BufferSize = 5000 - // WatchEventBuffers stores a sync buffer per kubernetes config - WatchEventBuffers = sync.Map{} - - WatchResourceBufferSize = 5000 - - // WatchEventBuffers stores a sync buffer per kubernetes config - WatchResourceBuffer = sync.Map{} + // WatchQueue stores a sync buffer per kubernetes config + WatchQueue = sync.Map{} // DeleteResourceBuffer stores a buffer per kubernetes config // that contains the ids of resources that have been deleted. @@ -49,12 +46,40 @@ func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj a return &unstructured.Unstructured{Object: m}, nil } +func pqComparator(a, b any) int { + var aTimestamp, bTimestamp time.Time + + switch v := a.(type) { + case v1.KubernetesEvent: + aTimestamp = v.Metadata.GetCreationTimestamp().Time + case *unstructured.Unstructured: + aTimestamp = v.GetCreationTimestamp().Time + } + + switch v := b.(type) { + case v1.KubernetesEvent: + bTimestamp = v.Metadata.GetCreationTimestamp().Time + case *unstructured.Unstructured: + bTimestamp = v.GetCreationTimestamp().Time + } + + if aTimestamp.Before(bTimestamp) { + return -1 + } else if aTimestamp.Equal(bTimestamp) { + return 0 + } else { + return 1 + } +} + // WatchResources watches Kubernetes resources with shared informers func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { - buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) - WatchResourceBuffer.Store(config.Hash(), buffer) + priorityQueue := pq.NewWith(pqComparator) + if loaded, ok := WatchQueue.LoadOrStore(config.Hash(), priorityQueue); ok { + priorityQueue = loaded.(*pq.Queue) + } - deleteBuffer := make(chan string, WatchResourceBufferSize) + deleteBuffer := make(chan string, BufferSize) DeleteResourceBuffer.Store(config.Hash(), deleteBuffer) if config.Kubeconfig != nil { @@ -67,7 +92,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { } for _, watchResource := range lo.Uniq(config.Watch) { - if err := globalSharedInformerManager.Register(ctx, watchResource, buffer, deleteBuffer); err != nil { + if err := globalSharedInformerManager.Register(ctx, watchResource, priorityQueue, deleteBuffer); err != nil { return fmt.Errorf("failed to register informer: %w", err) } } @@ -87,8 +112,10 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { // WatchEvents watches Kubernetes events for any config changes & fetches // the referenced config items in batches. func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error { - buffer := make(chan v1.KubernetesEvent, ctx.DutyContext().Properties().Int("kubernetes.watch.events.bufferSize", BufferSize)) - WatchEventBuffers.Store(config.Hash(), buffer) + priorityQueue := pq.NewWith(pqComparator) + if loaded, ok := WatchQueue.LoadOrStore(config.Hash(), priorityQueue); ok { + priorityQueue = loaded.(*pq.Queue) + } if config.Kubeconfig != nil { var err error @@ -127,11 +154,14 @@ func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error { continue } + // NOTE: Involved objects do not have labels. + // As a result, we have to make use of the ignoredConfigsCache to filter out events of resources that have been excluded + // with labels. if config.Exclusions.Filter(event.InvolvedObject.Name, event.InvolvedObject.Namespace, event.InvolvedObject.Kind, nil) { continue } - buffer <- event + priorityQueue.Enqueue(event) } return nil diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index 6c298d8c..1acb9805 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -5,6 +5,7 @@ import ( "strings" "sync" + pq "github.com/emirpasic/gods/queues/priorityqueue" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" @@ -12,7 +13,6 @@ import ( "github.com/flanksource/duty/models" "github.com/google/uuid" "github.com/samber/lo" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" @@ -37,7 +37,7 @@ type SharedInformerManager struct { type DeleteObjHandler func(ctx context.Context, id string) error -func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteBuffer chan<- string) error { +func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *pq.Queue, deleteBuffer chan<- string) error { apiVersion, kind := watchResource.ApiVersion, watchResource.Kind informer, stopper, isNew := t.getOrCreate(ctx, apiVersion, kind) @@ -65,7 +65,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1 if ctx.Properties().On(false, "scraper.log.items") { ctx.Logger.V(4).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) } - buffer <- u + queue.Enqueue(u) }, UpdateFunc: func(oldObj any, newObj any) { u, err := getUnstructuredFromInformedObj(watchResource, newObj) @@ -77,7 +77,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1 if ctx.Properties().On(false, "scraper.log.items") { ctx.Logger.V(3).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) } - buffer <- u + queue.Enqueue(u) }, DeleteFunc: func(obj any) { u, err := getUnstructuredFromInformedObj(watchResource, obj) diff --git a/scrapers/kubernetes/kubernetes.go b/scrapers/kubernetes/kubernetes.go index 681588ec..1751645d 100644 --- a/scrapers/kubernetes/kubernetes.go +++ b/scrapers/kubernetes/kubernetes.go @@ -6,6 +6,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/Jeffail/gabs/v2" @@ -93,6 +94,10 @@ func (kubernetes KubernetesScraper) IncrementalEventScrape( ) for _, event := range events { + if _, ok := ignoredConfigsCache.Load(event.InvolvedObject.UID); ok { + continue + } + if eventObj, err := event.ToUnstructured(); err != nil { ctx.DutyContext().Errorf("failed to convert event to unstructured: %v", err) continue @@ -173,6 +178,8 @@ func (kubernetes KubernetesScraper) Scrape(ctx api.ScrapeContext) v1.ScrapeResul return results } +var ignoredConfigsCache = sync.Map{} + // ExtractResults extracts scrape results from the given list of kuberenetes objects. // - withCluster: if true, will create & add a scrape result for the kubernetes cluster. func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v1.ScrapeResults { @@ -214,6 +221,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v ctx.Warnf("failed to ignore obj[%s]: %v", obj.GetName(), err) continue } else if ignore { + ignoredConfigsCache.Store(obj.GetUID(), struct{}{}) continue }