diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index b531fad3..33bd8fbb 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -988,38 +988,6 @@ func (in *OpenAPIFieldRef) DeepCopy() *OpenAPIFieldRef { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ScrapePluginList) DeepCopyInto(out *ScrapePluginList) { - *out = *in - out.TypeMeta = in.TypeMeta - in.ListMeta.DeepCopyInto(&out.ListMeta) - if in.Items != nil { - in, out := &in.Items, &out.Items - *out = make([]ScrapePlugin, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginList. -func (in *ScrapePluginList) DeepCopy() *ScrapePluginList { - if in == nil { - return nil - } - out := new(ScrapePluginList) - in.DeepCopyInto(out) - return out -} - -// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *ScrapePluginList) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { - return c - } - return nil -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodFile) DeepCopyInto(out *PodFile) { *out = *in @@ -1351,6 +1319,38 @@ func (in *ScrapePlugin) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScrapePluginList) DeepCopyInto(out *ScrapePluginList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ScrapePlugin, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScrapePluginList. +func (in *ScrapePluginList) DeepCopy() *ScrapePluginList { + if in == nil { + return nil + } + out := new(ScrapePluginList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ScrapePluginList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScrapePluginSpec) DeepCopyInto(out *ScrapePluginSpec) { *out = *in diff --git a/cmd/operator.go b/cmd/operator.go index d662f237..8faeb2f0 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -6,18 +6,11 @@ import ( commonsCtx "github.com/flanksource/commons/context" "github.com/flanksource/commons/logger" - "github.com/flanksource/kopper" - - "github.com/flanksource/config-db/api" - configsv1 "github.com/flanksource/config-db/api/v1" - v1 "github.com/flanksource/config-db/api/v1" - "github.com/flanksource/config-db/db" - "github.com/flanksource/config-db/scrapers" "github.com/flanksource/duty" - "github.com/flanksource/duty/context" dutyContext "github.com/flanksource/duty/context" "github.com/flanksource/duty/leader" "github.com/flanksource/duty/shutdown" + "github.com/flanksource/kopper" "github.com/go-logr/logr" "github.com/spf13/cobra" "go.opentelemetry.io/otel" @@ -26,6 +19,11 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" ctrl "sigs.k8s.io/controller-runtime" + + "github.com/flanksource/config-db/api" + v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/config-db/db" + "github.com/flanksource/config-db/scrapers" ) var ( @@ -82,16 +80,18 @@ func run(ctx dutyContext.Context, args []string) error { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(configsv1.AddToScheme(scheme)) + utilruntime.Must(v1.AddToScheme(scheme)) registerJobs(ctx, args) - go serve(dutyCtx) scrapers.StartEventListener(ctx) + go serve(dutyCtx) + go tableUpdatesHandler(dutyCtx) + return launchKopper(ctx) } -func launchKopper(ctx context.Context) error { +func launchKopper(ctx dutyContext.Context) error { mgr, err := kopper.Manager(&kopper.ManagerOptions{ AddToSchemeFunc: v1.AddToScheme, }) @@ -118,7 +118,7 @@ func launchKopper(ctx context.Context) error { return mgr.Start(ctrl.SetupSignalHandler()) } -func PersistScrapeConfigFromCRD(ctx context.Context, scrapeConfig *v1.ScrapeConfig) error { +func PersistScrapeConfigFromCRD(ctx dutyContext.Context, scrapeConfig *v1.ScrapeConfig) error { if changed, err := db.PersistScrapeConfigFromCRD(ctx, scrapeConfig); err != nil { return err } else if changed { diff --git a/cmd/server.go b/cmd/server.go index c4331113..c59ac4a6 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -18,6 +18,7 @@ import ( dutyContext "github.com/flanksource/duty/context" dutyEcho "github.com/flanksource/duty/echo" "github.com/flanksource/duty/postgrest" + "github.com/flanksource/duty/postq/pg" "github.com/flanksource/duty/shutdown" "github.com/labstack/echo-contrib/echoprometheus" @@ -50,11 +51,26 @@ var Serve = &cobra.Command{ registerJobs(dutyCtx, args) scrapers.StartEventListener(ctx) + go tableUpdatesHandler(dutyCtx) serve(dutyCtx) + return nil }, } +// tableUpdatesHandler handles all "table_activity" pg notifications. +func tableUpdatesHandler(ctx dutyContext.Context) { + notifyRouter := pg.NewNotifyRouter() + go notifyRouter.Run(ctx, "table_activity") + + for range notifyRouter.GetOrCreateChannel("scrape_plugins") { + ctx.Logger.V(3).Infof("reloading plugins") + if _, err := db.ReloadAllScrapePlugins(ctx); err != nil { + logger.Errorf("failed to reload plugins: %w", err) + } + } +} + func registerJobs(ctx dutyContext.Context, configFiles []string) { go startScraperCron(ctx, configFiles) shutdown.AddHook(scrapers.Stop) diff --git a/db/scrape_plugin.go b/db/scrape_plugin.go index 6b0e2f75..d56ba814 100644 --- a/db/scrape_plugin.go +++ b/db/scrape_plugin.go @@ -1,10 +1,15 @@ package db import ( + "encoding/json" + "fmt" + "time" + v1 "github.com/flanksource/config-db/api/v1" "github.com/flanksource/duty" "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" + gocache "github.com/patrickmn/go-cache" ) func PersistScrapePluginFromCRD(ctx context.Context, plugin *v1.ScrapePlugin) error { @@ -21,7 +26,33 @@ func DeleteScrapePlugin(ctx context.Context, id string) error { return ctx.DB().Model(&models.ScrapePlugin{}).Where("id = ?", id).Update("deleted_at", duty.Now()).Error } -func LoadAllPlugins(ctx context.Context) ([]models.ScrapePlugin, error) { +var cachedPlugin = gocache.New(time.Hour, time.Hour) + +func LoadAllPlugins(ctx context.Context) ([]v1.ScrapePluginSpec, error) { + if v, found := cachedPlugin.Get("only"); found { + return v.([]v1.ScrapePluginSpec), nil + } + + return ReloadAllScrapePlugins(ctx) +} + +func ReloadAllScrapePlugins(ctx context.Context) ([]v1.ScrapePluginSpec, error) { var plugins []models.ScrapePlugin - return plugins, ctx.DB().Where("deleted_at IS NULL").Find(&plugins).Error + if err := ctx.DB().Where("deleted_at IS NULL").Find(&plugins).Error; err != nil { + return nil, err + } + + specs := make([]v1.ScrapePluginSpec, 0, len(plugins)) + for _, p := range plugins { + var spec v1.ScrapePluginSpec + if err := json.Unmarshal(p.Spec, &spec); err != nil { + return nil, fmt.Errorf("failed to unmarshal scrape plugin spec(%s): %w", p.ID, err) + } + + specs = append(specs, spec) + } + + cachedPlugin.SetDefault("only", specs) + + return specs, nil } diff --git a/scrapers/cron.go b/scrapers/cron.go index ffbfb32f..af68e888 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -1,7 +1,6 @@ package scrapers import ( - "encoding/json" "fmt" "reflect" "sync" @@ -347,19 +346,9 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q objs = append(objs, res...) } - // TODO: maybe cache this and keep it in sync with pg notify - var plugins []v1.ScrapePluginSpec - if allPlugins, err := db.LoadAllPlugins(ctx.Context); err != nil { + plugins, err := db.LoadAllPlugins(ctx.Context) + if err != nil { return fmt.Errorf("failed to load plugins: %w", err) - } else { - for _, p := range allPlugins { - var spec v1.ScrapePluginSpec - if err := json.Unmarshal(p.Spec, &spec); err != nil { - return fmt.Errorf("failed to unmarshal scrape plugin spec: %w", err) - } - - plugins = append(plugins, spec) - } } scraperSpec := scrapeConfig.Spec.ApplyPlugin(plugins) diff --git a/scrapers/event.go b/scrapers/event.go index b67588bf..d9c11501 100644 --- a/scrapers/event.go +++ b/scrapers/event.go @@ -1,7 +1,6 @@ package scrapers import ( - "encoding/json" "fmt" "time" @@ -102,19 +101,9 @@ func incrementalScrapeFromEvent(ctx context.Context, event models.Event) error { return err } - // TODO: maybe cache this and keep it in sync with pg notify - var plugins []v1.ScrapePluginSpec - if allPlugins, err := db.LoadAllPlugins(ctx); err != nil { + plugins, err := db.LoadAllPlugins(ctx) + if err != nil { return fmt.Errorf("failed to load plugins: %w", err) - } else { - for _, p := range allPlugins { - var spec v1.ScrapePluginSpec - if err := json.Unmarshal(p.Spec, &spec); err != nil { - return fmt.Errorf("failed to unmarshal scrape plugin spec: %w", err) - } - - plugins = append(plugins, spec) - } } scrapeConfig.Spec = scrapeConfig.Spec.ApplyPlugin(plugins) diff --git a/scrapers/runscrapers.go b/scrapers/runscrapers.go index 873ffc20..f494d152 100644 --- a/scrapers/runscrapers.go +++ b/scrapers/runscrapers.go @@ -59,19 +59,9 @@ func RunK8sObjectsScraper(ctx api.ScrapeContext, config v1.Kubernetes, objs []*u // Run ... func Run(ctx api.ScrapeContext) ([]v1.ScrapeResult, error) { - // TODO: maybe cache this and keep it in sync with pg notify - var plugins []v1.ScrapePluginSpec - if allPlugins, err := db.LoadAllPlugins(ctx.DutyContext()); err != nil { + plugins, err := db.LoadAllPlugins(ctx.DutyContext()) + if err != nil { return nil, err - } else { - for _, p := range allPlugins { - var spec v1.ScrapePluginSpec - if err := json.Unmarshal(p.Spec, &spec); err != nil { - return nil, err - } - - plugins = append(plugins, spec) - } } var results v1.ScrapeResults