Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cache last scrape results #71

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 99 additions & 25 deletions collectors/monitoring_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,59 @@ type MonitoringCollector struct {
lastScrapeDurationSecondsMetric prometheus.Gauge
collectorFillMissingLabels bool
monitoringDropDelegatedProjects bool

cache *CollectionCache
}

type CollectionCache struct {
// This map holds the read-only result of a collection run
// It will be served from the promethus scrape endpoint until the next
// collection is complete.
cachedTimeSeries map[string]*TimeSeriesMetrics

// This map holds the (potentially incomplete) metrics that have been collected.
// Once completed it will replace the `cachedTimeSeries` and will start being served.
activeTimeSeries map[string]*TimeSeriesMetrics

// Indicates whether there is a collection currently running, and populating `activeTimeSeries`
// at the moment.
collectionActive bool

// Guards `activeTimeSeries` and `collectionActive`
mu sync.Mutex
}

// Update the cache state to indicate that a collection has started
func (c *CollectionCache) markCollectionStarted() {
log.Debugf("markCollectionStarted")
c.mu.Lock()
c.collectionActive = true
c.mu.Unlock()
}

// Update the cache state to indicate that a collection has completed
func (c *CollectionCache) markCollectionCompleted() {
log.Debugf("markCollectionCompleted")
c.mu.Lock()
defer c.mu.Unlock()
collected := c.activeTimeSeries
c.cachedTimeSeries = collected
c.activeTimeSeries = make(map[string]*TimeSeriesMetrics)
c.collectionActive = false
}

// Check if there is a collection running int he background
func (c *CollectionCache) isCollectionActive() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.collectionActive
}

// During a collection, this func should be used to save the collected data
func (c *CollectionCache) putMetric(metricType string, timeSeries *TimeSeriesMetrics) {
c.mu.Lock()
c.activeTimeSeries[metricType] = timeSeries
c.mu.Unlock()
}

func NewMonitoringCollector(
Expand Down Expand Up @@ -114,6 +167,11 @@ func NewMonitoringCollector(
lastScrapeDurationSecondsMetric: lastScrapeDurationSecondsMetric,
collectorFillMissingLabels: collectorFillMissingLabels,
monitoringDropDelegatedProjects: monitoringDropDelegatedProjects,
cache: &CollectionCache{
cachedTimeSeries: make(map[string]*TimeSeriesMetrics),
activeTimeSeries: make(map[string]*TimeSeriesMetrics),
collectionActive: false,
},
}

return monitoringCollector, nil
Expand All @@ -129,32 +187,41 @@ func (c *MonitoringCollector) Describe(ch chan<- *prometheus.Desc) {
}

func (c *MonitoringCollector) Collect(ch chan<- prometheus.Metric) {
var begun = time.Now()

errorMetric := float64(0)
if err := c.reportMonitoringMetrics(ch); err != nil {
errorMetric = float64(1)
c.scrapeErrorsTotalMetric.Inc()
log.Errorf("Error while getting Google Stackdriver Monitoring metrics: %s", err)
for _, timeSeries := range c.cache.cachedTimeSeries {
timeSeries.Complete(ch)
}
c.scrapeErrorsTotalMetric.Collect(ch)

c.scrapeErrorsTotalMetric.Collect(ch)
c.apiCallsTotalMetric.Collect(ch)

c.scrapesTotalMetric.Inc()
c.scrapesTotalMetric.Collect(ch)

c.lastScrapeErrorMetric.Set(errorMetric)
c.lastScrapeErrorMetric.Collect(ch)

c.lastScrapeTimestampMetric.Set(float64(time.Now().Unix()))
c.lastScrapeTimestampMetric.Collect(ch)

c.lastScrapeDurationSecondsMetric.Set(time.Since(begun).Seconds())
c.lastScrapeDurationSecondsMetric.Collect(ch)

if !c.cache.isCollectionActive() {
go func() {
start := time.Now()
errorMetric := float64(0)

c.cache.markCollectionStarted()
if err := c.updateMetricsCache(); err != nil {
errorMetric = float64(1)
c.scrapeErrorsTotalMetric.Inc()
log.Errorf("Error while getting Google Stackdriver Monitoring metrics: %s", err)
}

c.scrapesTotalMetric.Inc()
c.lastScrapeErrorMetric.Set(errorMetric)
c.lastScrapeTimestampMetric.Set(float64(time.Now().Unix()))
c.lastScrapeDurationSecondsMetric.Set(time.Since(start).Seconds())

c.cache.markCollectionCompleted()
}()
}
}

func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metric) error {
func (c *MonitoringCollector) updateMetricsCache() error {
metricDescriptorsFunction := func(page *monitoring.ListMetricDescriptorsResponse) error {
var wg = &sync.WaitGroup{}

Expand All @@ -181,7 +248,7 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri

for _, metricDescriptor := range uniqueDescriptors {
wg.Add(1)
go func(metricDescriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric) {
go func(metricDescriptor *monitoring.MetricDescriptor) {
defer wg.Done()
log.Debugf("Retrieving Google Stackdriver Monitoring metrics for descriptor `%s`...", metricDescriptor.Type)
filter := fmt.Sprintf("metric.type=\"%s\"", metricDescriptor.Type)
Expand All @@ -193,9 +260,13 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri
}
timeSeriesListCall := c.monitoringService.Projects.TimeSeries.List(utils.ProjectResource(c.projectID)).
Filter(filter).
PageSize(100000).
IntervalStartTime(startTime.Format(time.RFC3339Nano)).
IntervalEndTime(endTime.Format(time.RFC3339Nano))

pageNumber := 0

start := time.Now()
for {
c.apiCallsTotalMetric.Inc()
page, err := timeSeriesListCall.Do()
Expand All @@ -204,20 +275,26 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri
errChannel <- err
break
}

if page == nil {
break
}
if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch); err != nil {
if err := c.updateMetricsCacheForMetric(page, metricDescriptor); err != nil {
log.Errorf("Error reporting Time Series metrics for descriptor `%s`: %v", metricDescriptor.Type, err)
errChannel <- err
break
}
if page.NextPageToken == "" {
break
}
pageNumber++
timeSeriesListCall.PageToken(page.NextPageToken)
}
}(metricDescriptor, ch)

elapsed := time.Since(start)
log.Debugf("Took %s to retrieve %v pages for metric descriptor %s", elapsed, pageNumber+1, metricDescriptor.Type)

}(metricDescriptor)
}

wg.Wait()
Expand Down Expand Up @@ -257,18 +334,15 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri
return <-errChannel
}

func (c *MonitoringCollector) reportTimeSeriesMetrics(
func (c *MonitoringCollector) updateMetricsCacheForMetric(
page *monitoring.ListTimeSeriesResponse,
metricDescriptor *monitoring.MetricDescriptor,
ch chan<- prometheus.Metric,
) error {
metricDescriptor *monitoring.MetricDescriptor) error {
var metricValue float64
var metricValueType prometheus.ValueType
var newestTSPoint *monitoring.Point

timeSeriesMetrics := &TimeSeriesMetrics{
metricDescriptor: metricDescriptor,
ch: ch,
fillMissingLabels: c.collectorFillMissingLabels,
constMetrics: make(map[string][]ConstMetric),
histogramMetrics: make(map[string][]HistogramMetric),
Expand Down Expand Up @@ -354,7 +428,7 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics(

timeSeriesMetrics.CollectNewConstMetric(timeSeries, labelKeys, metricValueType, metricValue, labelValues)
}
timeSeriesMetrics.Complete()
c.cache.putMetric(metricDescriptor.Type, timeSeriesMetrics)
return nil
}

Expand Down
20 changes: 9 additions & 11 deletions collectors/monitoring_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/api/monitoring/v3"

"github.com/frodenas/stackdriver_exporter/utils"
"sort"

"github.com/frodenas/stackdriver_exporter/utils"
)

func buildFQName(timeSeries *monitoring.TimeSeries) string {
Expand All @@ -18,7 +19,6 @@ func buildFQName(timeSeries *monitoring.TimeSeries) string {

type TimeSeriesMetrics struct {
metricDescriptor *monitoring.MetricDescriptor
ch chan<- prometheus.Metric

fillMissingLabels bool
constMetrics map[string][]ConstMetric
Expand Down Expand Up @@ -74,7 +74,6 @@ func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time
t.histogramMetrics[fqName] = append(vs, v)
return
}
t.ch <- t.newConstHistogram(fqName, labelKeys, dist, buckets, labelValues)
}

func (t *TimeSeriesMetrics) newConstHistogram(fqName string, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric {
Expand Down Expand Up @@ -107,7 +106,6 @@ func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer
t.constMetrics[fqName] = append(vs, v)
return
}
t.ch <- t.newConstMetric(fqName, labelKeys, metricValueType, metricValue, labelValues)
}

func (t *TimeSeriesMetrics) newConstMetric(fqName string, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) prometheus.Metric {
Expand All @@ -131,12 +129,12 @@ func hashLabelKeys(labelKeys []string) uint64 {
return dh
}

func (t *TimeSeriesMetrics) Complete() {
t.completeConstMetrics()
t.completeHistogramMetrics()
func (t *TimeSeriesMetrics) Complete(ch chan<- prometheus.Metric) {
t.completeConstMetrics(ch)
t.completeHistogramMetrics(ch)
}

func (t *TimeSeriesMetrics) completeConstMetrics() {
func (t *TimeSeriesMetrics) completeConstMetrics(ch chan<- prometheus.Metric) {
for _, vs := range t.constMetrics {
if len(vs) > 1 {
var needFill bool
Expand All @@ -151,12 +149,12 @@ func (t *TimeSeriesMetrics) completeConstMetrics() {
}

for _, v := range vs {
t.ch <- t.newConstMetric(v.fqName, v.labelKeys, v.valueType, v.value, v.labelValues)
ch <- t.newConstMetric(v.fqName, v.labelKeys, v.valueType, v.value, v.labelValues)
}
}
}

func (t *TimeSeriesMetrics) completeHistogramMetrics() {
func (t *TimeSeriesMetrics) completeHistogramMetrics(ch chan<- prometheus.Metric) {
for _, vs := range t.histogramMetrics {
if len(vs) > 1 {
var needFill bool
Expand All @@ -170,7 +168,7 @@ func (t *TimeSeriesMetrics) completeHistogramMetrics() {
}
}
for _, v := range vs {
t.ch <- t.newConstHistogram(v.fqName, v.labelKeys, v.dist, v.buckets, v.labelValues)
ch <- t.newConstHistogram(v.fqName, v.labelKeys, v.dist, v.buckets, v.labelValues)
}
}
}
Expand Down