diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index bdbeb057..68e44e25 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -29,6 +29,59 @@ type MonitoringCollector struct { lastScrapeDurationSecondsMetric prometheus.Gauge collectorFillMissingLabels bool monitoringDropDelegatedProjects bool + + cache *CollectionCache +} + +type CollectionCache struct { + // This map holds the read-only result of a collection run + // It will be served from the promethus scrape endpoint until the next + // collection is complete. + cachedTimeSeries map[string]*TimeSeriesMetrics + + // This map holds the (potentially incomplete) metrics that have been collected. + // Once completed it will replace the `cachedTimeSeries` and will start being served. + activeTimeSeries map[string]*TimeSeriesMetrics + + // Indicates whether there is a collection currently running, and populating `activeTimeSeries` + // at the moment. + collectionActive bool + + // Guards `activeTimeSeries` and `collectionActive` + mu sync.Mutex +} + +// Update the cache state to indicate that a collection has started +func (c *CollectionCache) markCollectionStarted() { + log.Debugf("markCollectionStarted") + c.mu.Lock() + c.collectionActive = true + c.mu.Unlock() +} + +// Update the cache state to indicate that a collection has completed +func (c *CollectionCache) markCollectionCompleted() { + log.Debugf("markCollectionCompleted") + c.mu.Lock() + defer c.mu.Unlock() + collected := c.activeTimeSeries + c.cachedTimeSeries = collected + c.activeTimeSeries = make(map[string]*TimeSeriesMetrics) + c.collectionActive = false +} + +// Check if there is a collection running int he background +func (c *CollectionCache) isCollectionActive() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.collectionActive +} + +// During a collection, this func should be used to save the collected data +func (c *CollectionCache) putMetric(metricType string, timeSeries *TimeSeriesMetrics) { + c.mu.Lock() + c.activeTimeSeries[metricType] = timeSeries + c.mu.Unlock() } func NewMonitoringCollector( @@ -114,6 +167,11 @@ func NewMonitoringCollector( lastScrapeDurationSecondsMetric: lastScrapeDurationSecondsMetric, collectorFillMissingLabels: collectorFillMissingLabels, monitoringDropDelegatedProjects: monitoringDropDelegatedProjects, + cache: &CollectionCache{ + cachedTimeSeries: make(map[string]*TimeSeriesMetrics), + activeTimeSeries: make(map[string]*TimeSeriesMetrics), + collectionActive: false, + }, } return monitoringCollector, nil @@ -129,32 +187,41 @@ func (c *MonitoringCollector) Describe(ch chan<- *prometheus.Desc) { } func (c *MonitoringCollector) Collect(ch chan<- prometheus.Metric) { - var begun = time.Now() - errorMetric := float64(0) - if err := c.reportMonitoringMetrics(ch); err != nil { - errorMetric = float64(1) - c.scrapeErrorsTotalMetric.Inc() - log.Errorf("Error while getting Google Stackdriver Monitoring metrics: %s", err) + for _, timeSeries := range c.cache.cachedTimeSeries { + timeSeries.Complete(ch) } - c.scrapeErrorsTotalMetric.Collect(ch) + c.scrapeErrorsTotalMetric.Collect(ch) c.apiCallsTotalMetric.Collect(ch) - - c.scrapesTotalMetric.Inc() c.scrapesTotalMetric.Collect(ch) - - c.lastScrapeErrorMetric.Set(errorMetric) c.lastScrapeErrorMetric.Collect(ch) - - c.lastScrapeTimestampMetric.Set(float64(time.Now().Unix())) c.lastScrapeTimestampMetric.Collect(ch) - - c.lastScrapeDurationSecondsMetric.Set(time.Since(begun).Seconds()) c.lastScrapeDurationSecondsMetric.Collect(ch) + + if !c.cache.isCollectionActive() { + go func() { + start := time.Now() + errorMetric := float64(0) + + c.cache.markCollectionStarted() + if err := c.updateMetricsCache(); err != nil { + errorMetric = float64(1) + c.scrapeErrorsTotalMetric.Inc() + log.Errorf("Error while getting Google Stackdriver Monitoring metrics: %s", err) + } + + c.scrapesTotalMetric.Inc() + c.lastScrapeErrorMetric.Set(errorMetric) + c.lastScrapeTimestampMetric.Set(float64(time.Now().Unix())) + c.lastScrapeDurationSecondsMetric.Set(time.Since(start).Seconds()) + + c.cache.markCollectionCompleted() + }() + } } -func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metric) error { +func (c *MonitoringCollector) updateMetricsCache() error { metricDescriptorsFunction := func(page *monitoring.ListMetricDescriptorsResponse) error { var wg = &sync.WaitGroup{} @@ -181,7 +248,7 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri for _, metricDescriptor := range uniqueDescriptors { wg.Add(1) - go func(metricDescriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric) { + go func(metricDescriptor *monitoring.MetricDescriptor) { defer wg.Done() log.Debugf("Retrieving Google Stackdriver Monitoring metrics for descriptor `%s`...", metricDescriptor.Type) filter := fmt.Sprintf("metric.type=\"%s\"", metricDescriptor.Type) @@ -193,9 +260,13 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri } timeSeriesListCall := c.monitoringService.Projects.TimeSeries.List(utils.ProjectResource(c.projectID)). Filter(filter). + PageSize(100000). IntervalStartTime(startTime.Format(time.RFC3339Nano)). IntervalEndTime(endTime.Format(time.RFC3339Nano)) + pageNumber := 0 + + start := time.Now() for { c.apiCallsTotalMetric.Inc() page, err := timeSeriesListCall.Do() @@ -204,10 +275,11 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri errChannel <- err break } + if page == nil { break } - if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch); err != nil { + if err := c.updateMetricsCacheForMetric(page, metricDescriptor); err != nil { log.Errorf("Error reporting Time Series metrics for descriptor `%s`: %v", metricDescriptor.Type, err) errChannel <- err break @@ -215,9 +287,14 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri if page.NextPageToken == "" { break } + pageNumber++ timeSeriesListCall.PageToken(page.NextPageToken) } - }(metricDescriptor, ch) + + elapsed := time.Since(start) + log.Debugf("Took %s to retrieve %v pages for metric descriptor %s", elapsed, pageNumber+1, metricDescriptor.Type) + + }(metricDescriptor) } wg.Wait() @@ -257,18 +334,15 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri return <-errChannel } -func (c *MonitoringCollector) reportTimeSeriesMetrics( +func (c *MonitoringCollector) updateMetricsCacheForMetric( page *monitoring.ListTimeSeriesResponse, - metricDescriptor *monitoring.MetricDescriptor, - ch chan<- prometheus.Metric, -) error { + metricDescriptor *monitoring.MetricDescriptor) error { var metricValue float64 var metricValueType prometheus.ValueType var newestTSPoint *monitoring.Point timeSeriesMetrics := &TimeSeriesMetrics{ metricDescriptor: metricDescriptor, - ch: ch, fillMissingLabels: c.collectorFillMissingLabels, constMetrics: make(map[string][]ConstMetric), histogramMetrics: make(map[string][]HistogramMetric), @@ -354,7 +428,7 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( timeSeriesMetrics.CollectNewConstMetric(timeSeries, labelKeys, metricValueType, metricValue, labelValues) } - timeSeriesMetrics.Complete() + c.cache.putMetric(metricDescriptor.Type, timeSeriesMetrics) return nil } diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index 193ea965..a5137977 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -4,8 +4,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "google.golang.org/api/monitoring/v3" - "github.com/frodenas/stackdriver_exporter/utils" "sort" + + "github.com/frodenas/stackdriver_exporter/utils" ) func buildFQName(timeSeries *monitoring.TimeSeries) string { @@ -18,7 +19,6 @@ func buildFQName(timeSeries *monitoring.TimeSeries) string { type TimeSeriesMetrics struct { metricDescriptor *monitoring.MetricDescriptor - ch chan<- prometheus.Metric fillMissingLabels bool constMetrics map[string][]ConstMetric @@ -74,7 +74,6 @@ func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time t.histogramMetrics[fqName] = append(vs, v) return } - t.ch <- t.newConstHistogram(fqName, labelKeys, dist, buckets, labelValues) } func (t *TimeSeriesMetrics) newConstHistogram(fqName string, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric { @@ -107,7 +106,6 @@ func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer t.constMetrics[fqName] = append(vs, v) return } - t.ch <- t.newConstMetric(fqName, labelKeys, metricValueType, metricValue, labelValues) } func (t *TimeSeriesMetrics) newConstMetric(fqName string, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) prometheus.Metric { @@ -131,12 +129,12 @@ func hashLabelKeys(labelKeys []string) uint64 { return dh } -func (t *TimeSeriesMetrics) Complete() { - t.completeConstMetrics() - t.completeHistogramMetrics() +func (t *TimeSeriesMetrics) Complete(ch chan<- prometheus.Metric) { + t.completeConstMetrics(ch) + t.completeHistogramMetrics(ch) } -func (t *TimeSeriesMetrics) completeConstMetrics() { +func (t *TimeSeriesMetrics) completeConstMetrics(ch chan<- prometheus.Metric) { for _, vs := range t.constMetrics { if len(vs) > 1 { var needFill bool @@ -151,12 +149,12 @@ func (t *TimeSeriesMetrics) completeConstMetrics() { } for _, v := range vs { - t.ch <- t.newConstMetric(v.fqName, v.labelKeys, v.valueType, v.value, v.labelValues) + ch <- t.newConstMetric(v.fqName, v.labelKeys, v.valueType, v.value, v.labelValues) } } } -func (t *TimeSeriesMetrics) completeHistogramMetrics() { +func (t *TimeSeriesMetrics) completeHistogramMetrics(ch chan<- prometheus.Metric) { for _, vs := range t.histogramMetrics { if len(vs) > 1 { var needFill bool @@ -170,7 +168,7 @@ func (t *TimeSeriesMetrics) completeHistogramMetrics() { } } for _, v := range vs { - t.ch <- t.newConstHistogram(v.fqName, v.labelKeys, v.dist, v.buckets, v.labelValues) + ch <- t.newConstHistogram(v.fqName, v.labelKeys, v.dist, v.buckets, v.labelValues) } } }