From ec64fb91c523876fff39529a10310bbe5cad3600 Mon Sep 17 00:00:00 2001 From: Nicolas Takashi Date: Sat, 30 Jul 2022 00:05:48 +0100 Subject: [PATCH] [FEAT] updating ttl --- cmd/prom-aggregation-gateway/main.go | 55 ++++++++-- cmd/prom-aggregation-gateway/main_test.go | 126 +++++++++++----------- 2 files changed, 112 insertions(+), 69 deletions(-) diff --git a/cmd/prom-aggregation-gateway/main.go b/cmd/prom-aggregation-gateway/main.go index 19e9e93..425cfd1 100644 --- a/cmd/prom-aggregation-gateway/main.go +++ b/cmd/prom-aggregation-gateway/main.go @@ -8,6 +8,7 @@ import ( "net/http" "sort" "sync" + "time" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" @@ -78,7 +79,18 @@ func mergeBuckets(a, b []*dto.Bucket) []*dto.Bucket { return output } +func makeTimestampMs() int64 { + return time.Now().UnixNano() / int64(time.Millisecond) +} + func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { + // Getting the metric timestamp or creating one when that metric is merged + // It will be used to cleanup old metrics that have not been merged lately + metricTimestamp := b.GetTimestampMs() + if metricTimestamp == 0 { + metricTimestamp = makeTimestampMs() + } + switch ty { case dto.MetricType_COUNTER: return &dto.Metric{ @@ -86,6 +98,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { Counter: &dto.Counter{ Value: float64ptr(*a.Counter.Value + *b.Counter.Value), }, + TimestampMs: &metricTimestamp, } case dto.MetricType_GAUGE: @@ -97,6 +110,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { Gauge: &dto.Gauge{ Value: float64ptr(*a.Gauge.Value + *b.Gauge.Value), }, + TimestampMs: &metricTimestamp, } case dto.MetricType_HISTOGRAM: @@ -107,6 +121,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { SampleSum: float64ptr(*a.Histogram.SampleSum + *b.Histogram.SampleSum), Bucket: mergeBuckets(a.Histogram.Bucket, b.Histogram.Bucket), }, + TimestampMs: &metricTimestamp, } case dto.MetricType_UNTYPED: @@ -115,6 +130,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { Untyped: &dto.Untyped{ Value: float64ptr(*a.Untyped.Value + *b.Untyped.Value), }, + TimestampMs: &metricTimestamp, } case dto.MetricType_SUMMARY: @@ -164,13 +180,15 @@ func mergeFamily(a, b *dto.MetricFamily) (*dto.MetricFamily, error) { } type aggate struct { + timeToLive time.Duration familiesLock sync.RWMutex families map[string]*dto.MetricFamily } -func newAggate() *aggate { +func newAggate(ttl time.Duration) *aggate { return &aggate{ - families: map[string]*dto.MetricFamily{}, + timeToLive: ttl, + families: map[string]*dto.MetricFamily{}, } } @@ -196,6 +214,21 @@ func validateFamily(f *dto.MetricFamily) error { return nil } +func cleanupFamily(metrics []*dto.Metric, ttl int64) []*dto.Metric { + // CurrentTS for old metrics check + nowTS := makeTimestampMs() + + // Iterating over metrics and filtering out the old, not recently merged ones + var updatedMetrics []*dto.Metric + for _, metric := range metrics { + if nowTS-metric.GetTimestampMs() <= ttl { + updatedMetrics = append(updatedMetrics, metric) + } + } + + return updatedMetrics +} + func (a *aggate) parseAndMerge(r io.Reader) error { var parser expfmt.TextParser inFamilies, err := parser.TextToMetricFamilies(r) @@ -240,11 +273,20 @@ func (a *aggate) handler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", string(contentType)) enc := expfmt.NewEncoder(w, contentType) - a.familiesLock.RLock() - defer a.familiesLock.RUnlock() + a.familiesLock.Lock() + defer a.familiesLock.Unlock() metricNames := []string{} for name := range a.families { - metricNames = append(metricNames, name) + // Cleaning up metrics that have not been merged for a while + a.families[name].Metric = cleanupFamily(a.families[name].GetMetric(), a.timeToLive.Milliseconds) + + // Including only families that still have metrics to be scraped + if len(a.families[name].Metric) > 0 { + metricNames = append(metricNames, name) + } else { + // Remove the empty families + delete(a.families, name) + } } sort.Sort(sort.StringSlice(metricNames)) @@ -268,9 +310,10 @@ func main() { listen := flag.String("listen", ":80", "Address and port to listen on.") cors := flag.String("cors", "*", "The 'Access-Control-Allow-Origin' value to be returned.") pushPath := flag.String("push-path", "/metrics/", "HTTP path to accept pushed metrics.") + timeToLive := flag.Duration("ttl", 3600*time.Second, "How long unmerged metrics will live, in milliseconds (default 1h)") flag.Parse() - a := newAggate() + a := newAggate(*timeToLive) http.HandleFunc("/metrics", a.handler) http.HandleFunc("/-/healthy", handleHealthCheck) http.HandleFunc("/-/ready", handleHealthCheck) diff --git a/cmd/prom-aggregation-gateway/main_test.go b/cmd/prom-aggregation-gateway/main_test.go index a205178..95d0c3c 100644 --- a/cmd/prom-aggregation-gateway/main_test.go +++ b/cmd/prom-aggregation-gateway/main_test.go @@ -19,124 +19,124 @@ gauge 42 counter 31 # HELP histogram A histogram # TYPE histogram histogram -histogram_bucket{le="1"} 0 -histogram_bucket{le="2"} 0 -histogram_bucket{le="3"} 3 -histogram_bucket{le="4"} 4 -histogram_bucket{le="5"} 4 -histogram_bucket{le="6"} 4 -histogram_bucket{le="7"} 4 -histogram_bucket{le="8"} 4 -histogram_bucket{le="9"} 4 -histogram_bucket{le="10"} 4 -histogram_bucket{le="+Inf"} 4 -histogram_sum{} 2.5 -histogram_count{} 1 +histogram_bucket{le="1"} 0 %[1]d +histogram_bucket{le="2"} 0 %[1]d +histogram_bucket{le="3"} 3 %[1]d +histogram_bucket{le="4"} 4 %[1]d +histogram_bucket{le="5"} 4 %[1]d +histogram_bucket{le="6"} 4 %[1]d +histogram_bucket{le="7"} 4 %[1]d +histogram_bucket{le="8"} 4 %[1]d +histogram_bucket{le="9"} 4 %[1]d +histogram_bucket{le="10"} 4 %[1]d +histogram_bucket{le="+Inf"} 4 %[1]d +histogram_sum{} 2.5 %[1]d +histogram_count{} 1 %[1]d ` in2 = ` # HELP gauge A gauge # TYPE gauge gauge -gauge 57 +gauge 57 %[1]d # HELP counter A counter # TYPE counter counter -counter 29 +counter 29 %[1]d # HELP histogram A histogram # TYPE histogram histogram -histogram_bucket{le="1"} 0 -histogram_bucket{le="2"} 0 -histogram_bucket{le="3"} 0 -histogram_bucket{le="4"} 4 -histogram_bucket{le="5"} 5 -histogram_bucket{le="6"} 5 -histogram_bucket{le="7"} 5 -histogram_bucket{le="8"} 5 -histogram_bucket{le="9"} 5 -histogram_bucket{le="10"} 5 -histogram_bucket{le="+Inf"} 5 -histogram_sum 4.5 -histogram_count 1 +histogram_bucket{le="1"} 0 %[1]d +histogram_bucket{le="2"} 0 %[1]d +histogram_bucket{le="3"} 0 %[1]d +histogram_bucket{le="4"} 4 %[1]d +histogram_bucket{le="5"} 5 %[1]d +histogram_bucket{le="6"} 5 %[1]d +histogram_bucket{le="7"} 5 %[1]d +histogram_bucket{le="8"} 5 %[1]d +histogram_bucket{le="9"} 5 %[1]d +histogram_bucket{le="10"} 5 %[1]d +histogram_bucket{le="+Inf"} 5 %[1]d +histogram_sum 4.5 %[1]d +histogram_count 1 %[1]d ` want = `# HELP counter A counter # TYPE counter counter -counter 60 +counter 60 %[1]d # HELP gauge A gauge # TYPE gauge gauge -gauge 99 +gauge 99 %[1]d # HELP histogram A histogram # TYPE histogram histogram -histogram_bucket{le="1"} 0 -histogram_bucket{le="2"} 0 -histogram_bucket{le="3"} 3 -histogram_bucket{le="4"} 8 -histogram_bucket{le="5"} 9 -histogram_bucket{le="6"} 9 -histogram_bucket{le="7"} 9 -histogram_bucket{le="8"} 9 -histogram_bucket{le="9"} 9 -histogram_bucket{le="10"} 9 -histogram_bucket{le="+Inf"} 9 -histogram_sum 7 -histogram_count 2 +histogram_bucket{le="1"} 0 %[1]d +histogram_bucket{le="2"} 0 %[1]d +histogram_bucket{le="3"} 3 %[1]d +histogram_bucket{le="4"} 8 %[1]d +histogram_bucket{le="5"} 9 %[1]d +histogram_bucket{le="6"} 9 %[1]d +histogram_bucket{le="7"} 9 %[1]d +histogram_bucket{le="8"} 9 %[1]d +histogram_bucket{le="9"} 9 %[1]d +histogram_bucket{le="10"} 9 %[1]d +histogram_bucket{le="+Inf"} 9 %[1]d +histogram_sum 7 %[1]d +histogram_count 2 %[1]d ` multilabel1 = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 1 +counter{a="a",b="b"} 1 %[1]d ` multilabel2 = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 2 +counter{a="a",b="b"} 2 %[1]d ` multilabelResult = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 3 +counter{a="a",b="b"} 3 %[1]d ` labelFields1 = `# HELP ui_page_render_errors A counter # TYPE ui_page_render_errors counter -ui_page_render_errors{path="/org/:orgId"} 1 -ui_page_render_errors{path="/prom/:orgId"} 1 +ui_page_render_errors{path="/org/:orgId"} 1 %[1]d +ui_page_render_errors{path="/prom/:orgId"} 1 %[1]d ` labelFields2 = `# HELP ui_page_render_errors A counter # TYPE ui_page_render_errors counter -ui_page_render_errors{path="/prom/:orgId"} 1 +ui_page_render_errors{path="/prom/:orgId"} 1 %[1]d ` labelFieldResult = `# HELP ui_page_render_errors A counter # TYPE ui_page_render_errors counter -ui_page_render_errors{path="/org/:orgId"} 1 -ui_page_render_errors{path="/prom/:orgId"} 2 +ui_page_render_errors{path="/org/:orgId"} 1 %[1]d +ui_page_render_errors{path="/prom/:orgId"} 2 %[1]d ` gaugeInput = ` # HELP ui_external_lib_loaded A gauge with entries in un-sorted order # TYPE ui_external_lib_loaded gauge -ui_external_lib_loaded{name="ga",loaded="true"} 1 -ui_external_lib_loaded{name="Intercom",loaded="true"} 1 -ui_external_lib_loaded{name="mixpanel",loaded="true"} 1 +ui_external_lib_loaded{name="ga",loaded="true"} 1 %[1]d +ui_external_lib_loaded{name="Intercom",loaded="true"} 1 %[1]d +ui_external_lib_loaded{name="mixpanel",loaded="true"} 1 %[1]d ` gaugeOutput = `# HELP ui_external_lib_loaded A gauge with entries in un-sorted order # TYPE ui_external_lib_loaded gauge -ui_external_lib_loaded{loaded="true",name="Intercom"} 2 -ui_external_lib_loaded{loaded="true",name="ga"} 2 -ui_external_lib_loaded{loaded="true",name="mixpanel"} 2 +ui_external_lib_loaded{loaded="true",name="Intercom"} 2 %[1]d +ui_external_lib_loaded{loaded="true",name="ga"} 2 %[1]d +ui_external_lib_loaded{loaded="true",name="mixpanel"} 2 %[1]d ` duplicateLabels = ` # HELP ui_external_lib_loaded Test with duplicate values # TYPE ui_external_lib_loaded gauge -ui_external_lib_loaded{name="Munchkin",loaded="true"} 15171 -ui_external_lib_loaded{name="Munchkin",loaded="true"} 1 +ui_external_lib_loaded{name="Munchkin",loaded="true"} 15171 %[1]d +ui_external_lib_loaded{name="Munchkin",loaded="true"} 1 %[1]d ` duplicateError = `Duplicate labels: {__name__="ui_external_lib_loaded", loaded="true", name="Munchkin"}` reorderedLabels1 = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 1 +counter{a="a",b="b"} 1 %[1]d ` reorderedLabels2 = `# HELP counter A counter # TYPE counter counter -counter{b="b",a="a"} 2 +counter{b="b",a="a"} 2 %[1]d ` reorderedLabelsResult = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 3 +counter{a="a",b="b"} 3 %[1]d ` ) @@ -154,7 +154,7 @@ func TestAggate(t *testing.T) { {duplicateLabels, "", "", fmt.Errorf("%s", duplicateError), nil}, {reorderedLabels1, reorderedLabels2, reorderedLabelsResult, nil, nil}, } { - a := newAggate() + a := newAggate(3600000) if err := a.parseAndMerge(strings.NewReader(c.a)); err != nil { if c.err1 == nil {