Skip to content

Commit

Permalink
feat: single job to consume watch events & resources
Browse files Browse the repository at this point in the history
* fix: config_changes_config_id_fkey

* feat: ignored configs cache

* feat: priority queue
  • Loading branch information
adityathebe authored Nov 26, 2024
1 parent 0dc450f commit 0d74a4e
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 146 deletions.
4 changes: 2 additions & 2 deletions api/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (t *ScrapeSummary) AddChangeSummary(configType string, cs ChangeSummary) {
v.Change = &ChangeSummary{
Ignored: cs.Ignored,
Orphaned: cs.Orphaned,
ForeginKeyErrors: cs.ForeginKeyErrors,
ForeignKeyErrors: cs.ForeignKeyErrors,
}
(*t)[configType] = v
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (t *ScrapeSummary) AddWarning(configType, warning string) {
type ChangeSummary struct {
Orphaned map[string]int `json:"orphaned,omitempty"`
Ignored map[string]int `json:"ignored,omitempty"`
ForeginKeyErrors int `json:"foreign_key_errors,omitempty"`
ForeignKeyErrors int `json:"foreign_key_errors,omitempty"`
}

func (t ChangeSummary) IsEmpty() bool {
Expand Down
6 changes: 4 additions & 2 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,11 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum
for _, c := range newChanges {
if err := ctx.DB().Create(&c).Error; err != nil {
if !dutydb.IsForeignKeyError(err) {
return summary, fmt.Errorf("failed to create config changes: %w", dutydb.ErrorDetails(err))
return summary, fmt.Errorf("failed to create config change: %w", dutydb.ErrorDetails(err))
}
summary.AddChangeSummary(c.ConfigType, v1.ChangeSummary{ForeginKeyErrors: 1})

ctx.Errorf("failed to save config change: (config:%s, details:%v changeType:%s, externalChangeID:%s)", c.ConfigID, c.Details, c.ChangeType, lo.FromPtr(c.ExternalChangeID))
summary.AddChangeSummary(c.ConfigType, v1.ChangeSummary{ForeignKeyErrors: 1})
}
}
}
Expand Down
245 changes: 121 additions & 124 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

pq "github.com/emirpasic/gods/queues/priorityqueue"
"github.com/flanksource/commons/collections"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/context"
Expand Down Expand Up @@ -124,7 +125,6 @@ func SyncScrapeConfigs(sc context.Context) {
var existing []string
for _, m := range scraperConfigsDB {
existing = append(existing, m.ID.String())
existing = append(existing, consumeKubernetesWatchResourcesJobKey(m.ID.String()))
existing = append(existing, consumeKubernetesWatchEventsJobKey(m.ID.String()))
}

Expand Down Expand Up @@ -252,6 +252,10 @@ func newScraperJob(sc api.ScrapeContext) *job.Job {
func scheduleScraperJob(sc api.ScrapeContext) error {
j := newScraperJob(sc)

if sc.PropertyOn(false, "disable") {
return nil
}

scrapeJobs.Store(sc.ScraperID(), j)
if err := j.AddToScheduler(scrapeJobScheduler); err != nil {
return fmt.Errorf("[%s] failed to schedule %v", j.Name, err)
Expand All @@ -272,17 +276,11 @@ func scheduleScraperJob(sc api.ScrapeContext) error {
return fmt.Errorf("failed to watch kubernetes resources: %v", err)
}

eventsWatchJob := ConsumeKubernetesWatchEventsJobFunc(sc, config)
if err := eventsWatchJob.AddToScheduler(scrapeJobScheduler); err != nil {
return fmt.Errorf("failed to schedule kubernetes watch event consumer job: %v", err)
watchConsumerJob := ConsumeKubernetesWatchJobFunc(sc, config)
if err := watchConsumerJob.AddToScheduler(scrapeJobScheduler); err != nil {
return fmt.Errorf("failed to schedule kubernetes watch consumer job: %v", err)
}
scrapeJobs.Store(consumeKubernetesWatchEventsJobKey(sc.ScraperID()), eventsWatchJob)

resourcesWatchJob := ConsumeKubernetesWatchResourcesJobFunc(sc, config)
if err := resourcesWatchJob.AddToScheduler(scrapeJobScheduler); err != nil {
return fmt.Errorf("failed to schedule kubernetes watch resources consumer job: %v", err)
}
scrapeJobs.Store(consumeKubernetesWatchResourcesJobKey(sc.ScraperID()), resourcesWatchJob)
scrapeJobs.Store(consumeKubernetesWatchEventsJobKey(sc.ScraperID()), watchConsumerJob)
}

return nil
Expand All @@ -292,12 +290,12 @@ func consumeKubernetesWatchEventsJobKey(id string) string {
return id + "-consume-kubernetes-watch-events"
}

// ConsumeKubernetesWatchEventsJobFunc returns a job that consumes kubernetes watch events
// ConsumeKubernetesWatchJobFunc returns a job that consumes kubernetes watch events
// for the given config of the scrapeconfig.
func ConsumeKubernetesWatchEventsJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job {
func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job {
scrapeConfig := *sc.ScrapeConfig()
return &job.Job{
Name: "ConsumeKubernetesWatchEvents",
Name: "ConsumeKubernetesWatch",
Context: sc.DutyContext().WithObject(sc.ScrapeConfig().ObjectMeta),
JobHistory: true,
Singleton: true,
Expand All @@ -307,140 +305,145 @@ func ConsumeKubernetesWatchEventsJobFunc(sc api.ScrapeContext, config v1.Kuberne
ID: fmt.Sprintf("%s/%s", sc.ScrapeConfig().Namespace, sc.ScrapeConfig().Name),
ResourceType: job.ResourceTypeScraper,
Fn: func(ctx job.JobRuntime) error {
_ch, ok := kubernetes.WatchEventBuffers.Load(config.Hash())
if !ok {
return fmt.Errorf("no watcher found for config (scrapeconfig: %s) %s", scrapeConfig.GetUID(), config.Hash())
var queue *pq.Queue
if q, ok := kubernetes.WatchQueue.Load(config.Hash()); !ok {
return fmt.Errorf("no watch queue found for config (scrapeconfig: %s) %s", scrapeConfig.GetUID(), config.Hash())
} else {
queue = q.(*pq.Queue)
}

ch := _ch.(chan v1.KubernetesEvent)
events, _, _, _ := lo.Buffer(ch, len(ch))
var events []v1.KubernetesEvent
var objs []*unstructured.Unstructured
var count int
for {
val, more := queue.Dequeue()
if !more {
break
}

cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("%s/%s", ctx.GetNamespace(), ctx.GetName()))
results, err := RunK8IncrementalScraper(cc, config, events)
if err != nil {
return err
}
// On the off chance the queue is populated faster than it's consumed
// and to keep each run short, we set a limit.
if count > kubernetes.BufferSize {
break
}

if summary, err := db.SaveResults(cc, results); err != nil {
return fmt.Errorf("failed to save results: %w", err)
} else {
ctx.History.AddDetails("scrape_summary", summary)
switch v := val.(type) {
case v1.KubernetesEvent:
events = append(events, v)
case *unstructured.Unstructured:
objs = append(objs, v)
default:
return fmt.Errorf("unexpected data in the queue: %T", v)
}
}

for i := range results {
if results[i].Error != nil {
ctx.History.AddError(results[i].Error.Error())
} else {
ctx.History.SuccessCount++
}
// NOTE: The resource watcher can return multiple objects for the same NEW resource.
// Example: if a new pod is created, we'll get that pod object multiple times for different events.
// All those resource objects are seen as distinct new config items.
// Hence, we need to use the latest one otherwise saving fails
// as we'll be trying to BATCH INSERT multiple config items with the same id.
//
// In the process, we will lose diff changes though.
// If diff changes are necessary, then we can split up the results in such
// a way that no two objects in a batch have the same id.
objs = dedup(objs)
if err := consumeResources(ctx, scrapeConfig, config, objs); err != nil {
ctx.History.AddErrorf("failed to consume resources: %v", err)
return err
}

return nil
return consumeWatchEvents(ctx, scrapeConfig, config, events)
},
}
}

func consumeKubernetesWatchResourcesJobKey(id string) string {
return id + "-consume-kubernetes-watch-resources"
}
func consumeWatchEvents(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, events []v1.KubernetesEvent) error {
cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("%s/%s", ctx.GetNamespace(), ctx.GetName()))
results, err := RunK8IncrementalScraper(cc, config, events)
if err != nil {
return err
}

func dedup(objs []*unstructured.Unstructured) []*unstructured.Unstructured {
var output []*unstructured.Unstructured
seen := make(map[types.UID]struct{})
if summary, err := db.SaveResults(cc, results); err != nil {
return fmt.Errorf("failed to save results: %w", err)
} else {
ctx.History.AddDetails("scrape_summary", summary)
}

// Iterate in reverse, cuz we want the latest
for i := len(objs) - 1; i >= 0; i-- {
if _, ok := seen[objs[i].GetUID()]; ok {
continue
for i := range results {
if results[i].Error != nil {
ctx.History.AddError(results[i].Error.Error())
} else {
ctx.History.SuccessCount++
}

seen[objs[i].GetUID()] = struct{}{}
output = append(output, objs[i])
}

return output
return nil
}

// ConsumeKubernetesWatchEventsJobFunc returns a job that consumes kubernetes watch events
// for the given config of the scrapeconfig.
func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job {
scrapeConfig := *sc.ScrapeConfig()
return &job.Job{
Name: "ConsumeKubernetesWatchResources",
Context: sc.DutyContext().WithObject(sc.ScrapeConfig().ObjectMeta),
JobHistory: true,
Singleton: true,
Retention: job.RetentionFew,
Schedule: "@every 15s",
ResourceID: string(scrapeConfig.GetUID()),
ID: fmt.Sprintf("%s/%s", sc.ScrapeConfig().Namespace, sc.ScrapeConfig().Name),
ResourceType: job.ResourceTypeScraper,
Fn: func(ctx job.JobRuntime) error {
_ch, ok := kubernetes.WatchResourceBuffer.Load(config.Hash())
if !ok {
return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash())
}
ch := _ch.(chan *unstructured.Unstructured)
objs, _, _, _ := lo.Buffer(ch, len(ch))
func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs []*unstructured.Unstructured) error {
cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName()))
results, err := RunK8sObjectsScraper(cc, config, objs)
if err != nil {
return err
}

// NOTE: The resource watcher can return multiple objects for the same NEW resource.
// Example: if a new pod is created, we'll get that pod object multiple times for different events.
// All those resource objects are seen as distinct new config items.
// Hence, we need to use the latest one otherwise saving fails
// as we'll be trying to BATCH INSERT multiple config items with the same id.
//
// In the process, we will lose diff changes though.
// If diff changes are necessary, then we can split up the results in such
// a way that no two objects in a batch have the same id.
objs = dedup(objs)
if summary, err := db.SaveResults(cc, results); err != nil {
return fmt.Errorf("failed to save %d results: %w", len(results), err)
} else {
ctx.History.AddDetails("scrape_summary", summary)
}

cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName()))
results, err := RunK8sObjectsScraper(cc, config, objs)
if err != nil {
return err
}
for i := range results {
if results[i].Error != nil {
ctx.History.AddError(results[i].Error.Error())
} else {
ctx.History.SuccessCount++
}
}

if summary, err := db.SaveResults(cc, results); err != nil {
return fmt.Errorf("failed to save %d results: %w", len(results), err)
} else {
ctx.History.AddDetails("scrape_summary", summary)
}
_deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash())
if !ok {
return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash())
}
deleteChan := _deleteCh.(chan string)

for i := range results {
if results[i].Error != nil {
ctx.History.AddError(results[i].Error.Error())
} else {
ctx.History.SuccessCount++
}
}
if len(deleteChan) > 0 {
deletedResourcesIDs, _, _, _ := lo.Buffer(deleteChan, len(deleteChan))

_deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash())
if !ok {
return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash())
total, err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...)
if err != nil {
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err)
} else if total != len(deletedResourcesIDs) {
ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourcesIDs))
if cc.PropertyOn(false, "log.missing") {
ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResourcesIDs), total)
}
deleteChan := _deleteCh.(chan string)
}

ctx.History.SuccessCount += total
}

if len(deleteChan) > 0 {
deletedResourcesIDs, _, _, _ := lo.Buffer(deleteChan, len(deleteChan))
return nil
}

total, err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...)
if err != nil {
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err)
} else if total != len(deletedResourcesIDs) {
ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourcesIDs))
if sc.PropertyOn(false, "log.missing") {
ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResourcesIDs), total)
}
}
func dedup(objs []*unstructured.Unstructured) []*unstructured.Unstructured {
var output []*unstructured.Unstructured
seen := make(map[types.UID]struct{})

ctx.History.SuccessCount += total
}
// Iterate in reverse, cuz we want the latest
for i := len(objs) - 1; i >= 0; i-- {
if _, ok := seen[objs[i].GetUID()]; ok {
continue
}

return nil
},
seen[objs[i].GetUID()] = struct{}{}
output = append(output, objs[i])
}

return output
}

func DeleteScrapeJob(id string) {
Expand All @@ -457,10 +460,4 @@ func DeleteScrapeJob(id string) {
existingJob.Unschedule()
scrapeJobs.Delete(id)
}

if j, ok := scrapeJobs.Load(consumeKubernetesWatchResourcesJobKey(id)); ok {
existingJob := j.(*job.Job)
existingJob.Unschedule()
scrapeJobs.Delete(id)
}
}
Loading

0 comments on commit 0d74a4e

Please sign in to comment.