Skip to content

Commit

Permalink
feat: cache scrape plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Dec 2, 2024
1 parent 98cbca8 commit 5bb3c7b
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 84 deletions.
64 changes: 32 additions & 32 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,11 @@ import (

commonsCtx "github.com/flanksource/commons/context"
"github.com/flanksource/commons/logger"
"github.com/flanksource/kopper"

"github.com/flanksource/config-db/api"
configsv1 "github.com/flanksource/config-db/api/v1"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/scrapers"
"github.com/flanksource/duty"
"github.com/flanksource/duty/context"
dutyContext "github.com/flanksource/duty/context"
"github.com/flanksource/duty/leader"
"github.com/flanksource/duty/shutdown"
"github.com/flanksource/kopper"
"github.com/go-logr/logr"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
Expand All @@ -26,6 +19,11 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/scrapers"
)

var (
Expand Down Expand Up @@ -82,16 +80,18 @@ func run(ctx dutyContext.Context, args []string) error {

scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(configsv1.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))

registerJobs(ctx, args)
go serve(dutyCtx)
scrapers.StartEventListener(ctx)

go serve(dutyCtx)
go tableUpdatesHandler(dutyCtx)

return launchKopper(ctx)
}

func launchKopper(ctx context.Context) error {
func launchKopper(ctx dutyContext.Context) error {
mgr, err := kopper.Manager(&kopper.ManagerOptions{
AddToSchemeFunc: v1.AddToScheme,
})
Expand All @@ -118,7 +118,7 @@ func launchKopper(ctx context.Context) error {
return mgr.Start(ctrl.SetupSignalHandler())
}

func PersistScrapeConfigFromCRD(ctx context.Context, scrapeConfig *v1.ScrapeConfig) error {
func PersistScrapeConfigFromCRD(ctx dutyContext.Context, scrapeConfig *v1.ScrapeConfig) error {
if changed, err := db.PersistScrapeConfigFromCRD(ctx, scrapeConfig); err != nil {
return err
} else if changed {
Expand Down
16 changes: 16 additions & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
dutyContext "github.com/flanksource/duty/context"
dutyEcho "github.com/flanksource/duty/echo"
"github.com/flanksource/duty/postgrest"
"github.com/flanksource/duty/postq/pg"
"github.com/flanksource/duty/shutdown"

"github.com/labstack/echo-contrib/echoprometheus"
Expand Down Expand Up @@ -50,11 +51,26 @@ var Serve = &cobra.Command{

registerJobs(dutyCtx, args)
scrapers.StartEventListener(ctx)
go tableUpdatesHandler(dutyCtx)
serve(dutyCtx)

return nil
},
}

// tableUpdatesHandler handles all "table_activity" pg notifications.
func tableUpdatesHandler(ctx dutyContext.Context) {
notifyRouter := pg.NewNotifyRouter()
go notifyRouter.Run(ctx, "table_activity")

for range notifyRouter.GetOrCreateChannel("scrape_plugins") {
ctx.Logger.V(3).Infof("reloading plugins")
if _, err := db.ReloadAllScrapePlugins(ctx); err != nil {
logger.Errorf("failed to reload plugins: %w", err)
}
}
}

func registerJobs(ctx dutyContext.Context, configFiles []string) {
go startScraperCron(ctx, configFiles)
shutdown.AddHook(scrapers.Stop)
Expand Down
35 changes: 33 additions & 2 deletions db/scrape_plugin.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package db

import (
"encoding/json"
"fmt"
"time"

v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/duty"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
gocache "github.com/patrickmn/go-cache"
)

func PersistScrapePluginFromCRD(ctx context.Context, plugin *v1.ScrapePlugin) error {
Expand All @@ -21,7 +26,33 @@ func DeleteScrapePlugin(ctx context.Context, id string) error {
return ctx.DB().Model(&models.ScrapePlugin{}).Where("id = ?", id).Update("deleted_at", duty.Now()).Error
}

func LoadAllPlugins(ctx context.Context) ([]models.ScrapePlugin, error) {
var cachedPlugin = gocache.New(time.Hour, time.Hour)

func LoadAllPlugins(ctx context.Context) ([]v1.ScrapePluginSpec, error) {
if v, found := cachedPlugin.Get("only"); found {
return v.([]v1.ScrapePluginSpec), nil
}

return ReloadAllScrapePlugins(ctx)
}

func ReloadAllScrapePlugins(ctx context.Context) ([]v1.ScrapePluginSpec, error) {
var plugins []models.ScrapePlugin
return plugins, ctx.DB().Where("deleted_at IS NULL").Find(&plugins).Error
if err := ctx.DB().Where("deleted_at IS NULL").Find(&plugins).Error; err != nil {
return nil, err
}

specs := make([]v1.ScrapePluginSpec, 0, len(plugins))
for _, p := range plugins {
var spec v1.ScrapePluginSpec
if err := json.Unmarshal(p.Spec, &spec); err != nil {
return nil, fmt.Errorf("failed to unmarshal scrape plugin spec(%s): %w", p.ID, err)
}

specs = append(specs, spec)
}

cachedPlugin.SetDefault("only", specs)

return specs, nil
}
15 changes: 2 additions & 13 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package scrapers

import (
"encoding/json"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -347,19 +346,9 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
objs = append(objs, res...)
}

// TODO: maybe cache this and keep it in sync with pg notify
var plugins []v1.ScrapePluginSpec
if allPlugins, err := db.LoadAllPlugins(ctx.Context); err != nil {
plugins, err := db.LoadAllPlugins(ctx.Context)
if err != nil {
return fmt.Errorf("failed to load plugins: %w", err)
} else {
for _, p := range allPlugins {
var spec v1.ScrapePluginSpec
if err := json.Unmarshal(p.Spec, &spec); err != nil {
return fmt.Errorf("failed to unmarshal scrape plugin spec: %w", err)
}

plugins = append(plugins, spec)
}
}

scraperSpec := scrapeConfig.Spec.ApplyPlugin(plugins)
Expand Down
15 changes: 2 additions & 13 deletions scrapers/event.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package scrapers

import (
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -102,19 +101,9 @@ func incrementalScrapeFromEvent(ctx context.Context, event models.Event) error {
return err
}

// TODO: maybe cache this and keep it in sync with pg notify
var plugins []v1.ScrapePluginSpec
if allPlugins, err := db.LoadAllPlugins(ctx); err != nil {
plugins, err := db.LoadAllPlugins(ctx)
if err != nil {
return fmt.Errorf("failed to load plugins: %w", err)
} else {
for _, p := range allPlugins {
var spec v1.ScrapePluginSpec
if err := json.Unmarshal(p.Spec, &spec); err != nil {
return fmt.Errorf("failed to unmarshal scrape plugin spec: %w", err)
}

plugins = append(plugins, spec)
}
}

scrapeConfig.Spec = scrapeConfig.Spec.ApplyPlugin(plugins)
Expand Down
14 changes: 2 additions & 12 deletions scrapers/runscrapers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,9 @@ func RunK8sObjectsScraper(ctx api.ScrapeContext, config v1.Kubernetes, objs []*u

// Run ...
func Run(ctx api.ScrapeContext) ([]v1.ScrapeResult, error) {
// TODO: maybe cache this and keep it in sync with pg notify
var plugins []v1.ScrapePluginSpec
if allPlugins, err := db.LoadAllPlugins(ctx.DutyContext()); err != nil {
plugins, err := db.LoadAllPlugins(ctx.DutyContext())
if err != nil {
return nil, err
} else {
for _, p := range allPlugins {
var spec v1.ScrapePluginSpec
if err := json.Unmarshal(p.Spec, &spec); err != nil {
return nil, err
}

plugins = append(plugins, spec)
}
}

var results v1.ScrapeResults
Expand Down

0 comments on commit 5bb3c7b

Please sign in to comment.