From 255bfc57f504a8a1d79a9589fd10394c1baa2705 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Tue, 20 Aug 2024 11:16:18 +0200 Subject: [PATCH] Create functions for cluster and container metrics --- pkg/metrics/metrics.go | 167 ++++++++++++++++------------------------- 1 file changed, 64 insertions(+), 103 deletions(-) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 12227c2..db80bd5 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -543,134 +543,95 @@ func (pms *PromMetricSync) Pods(ctx context.Context, informer kcache.SharedIndex return g.Wait() } -// Run starts syncing the prometheus metrics to the database. -// Therefore, it gets a list of the metric queries. -func (pms *PromMetricSync) Run(ctx context.Context) error { - g, ctx := errgroup.WithContext(ctx) +func (pms *PromMetricSync) Containers(ctx context.Context, informer kcache.SharedIndexInformer) error { + if !kcache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + pms.logger.Fatal("timed out waiting for caches to sync") + } - upsertClusterMetrics := make(chan database.Entity) - upsertContainerMetrics := make(chan database.Entity) + upsertMetrics := make(chan database.Entity) - for _, promQuery := range promQueriesCluster { - promQuery := promQuery + g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - for { - result, warnings, err := pms.promApiClient.Query( - ctx, - promQuery.query, - time.Time{}, - ) - if err != nil { - return errors.Wrap(err, "error querying Prometheus") - } - if len(warnings) > 0 { - fmt.Printf("Warnings: %v\n", warnings) - } - if result == nil { - fmt.Println("No results found") - continue + g.Go(func() error { + return pms.run( + ctx, + promQueriesContainer, + upsertMetrics, + func(query PromQuery, res *model.Sample) database.Entity { + if res.Value.String() == "NaN" { + return nil } - for _, res := range result.(model.Vector) { - if res.Value.String() == "NaN" { - continue - } - - //clusterId := sha1.Sum([]byte("")) - - name := "" - - if promQuery.nameLabel != "" { - name = string(res.Metric[promQuery.nameLabel]) - } + //containerId := sha1.Sum([]byte(res.Metric["namespace"] + "/" + res.Metric["pod"] + "/" + res.Metric["container"])) - newClusterMetric := &schemav1.PrometheusClusterMetric{ - Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000, - Category: promQuery.metricCategory, - Name: name, - Value: float64(res.Value), - } + name := "" - select { - case upsertClusterMetrics <- newClusterMetric: - case <-ctx.Done(): - return ctx.Err() - } + if query.nameLabel != "" { + name = string(res.Metric[query.nameLabel]) } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Second * 55): + newContainerMetric := &schemav1.PrometheusContainerMetric{ + // TODO uuid + Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000, + Category: query.metricCategory, + Name: name, + Value: float64(res.Value), } - } - }) - } - for _, promQuery := range promQueriesContainer { - promQuery := promQuery + return newContainerMetric + }, + ) + }) - g.Go(func() error { - for { - result, warnings, err := pms.promApiClient.Query( - ctx, - promQuery.query, - time.Time{}, - ) - if err != nil { - return errors.Wrap(err, "error querying Prometheus") - } - if len(warnings) > 0 { - fmt.Printf("Warnings: %v\n", warnings) - } - if result == nil { - fmt.Println("No results found") - continue - } + g.Go(func() error { + return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricContainerUpsertStmt(), 5)).Stream(ctx, upsertMetrics) + }) - for _, res := range result.(model.Vector) { - if res.Value.String() == "NaN" { - continue - } + return g.Wait() +} - //containerId := sha1.Sum([]byte(res.Metric["namespace"] + "/" + res.Metric["pod"] + "/" + res.Metric["container"])) +func (pms *PromMetricSync) Clusters(ctx context.Context, informer kcache.SharedIndexInformer) error { + if !kcache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + pms.logger.Fatal("timed out waiting for caches to sync") + } - name := "" + upsertMetrics := make(chan database.Entity) - if promQuery.nameLabel != "" { - name = string(res.Metric[promQuery.nameLabel]) - } + g, ctx := errgroup.WithContext(ctx) - newContainerMetric := &schemav1.PrometheusContainerMetric{ - Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000, - Category: promQuery.metricCategory, - Name: name, - Value: float64(res.Value), - } + g.Go(func() error { + return pms.run( + ctx, + promQueriesCluster, + upsertMetrics, + func(query PromQuery, res *model.Sample) database.Entity { + if res.Value.String() == "NaN" { + return nil + } - select { - case upsertContainerMetrics <- newContainerMetric: - case <-ctx.Done(): - return ctx.Err() - } + //clusterId := sha1.Sum([]byte("")) + + name := "" + + if query.nameLabel != "" { + name = string(res.Metric[query.nameLabel]) } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Second * 55): + newClusterMetric := &schemav1.PrometheusClusterMetric{ + // TODO uuid + Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000, + Category: query.metricCategory, + Name: name, + Value: float64(res.Value), } - } - }) - } - g.Go(func() error { - return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricClusterUpsertStmt(), 5)).Stream(ctx, upsertClusterMetrics) + return newClusterMetric + }, + ) }) g.Go(func() error { - return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricContainerUpsertStmt(), 5)).Stream(ctx, upsertContainerMetrics) + return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricClusterUpsertStmt(), 5)).Stream(ctx, upsertMetrics) }) return g.Wait()