Skip to content

Commit

Permalink
[FEAT] updating ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolastakashi committed Jul 29, 2022
1 parent c4415bb commit ec64fb9
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 69 deletions.
55 changes: 49 additions & 6 deletions cmd/prom-aggregation-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"sort"
"sync"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
Expand Down Expand Up @@ -78,14 +79,26 @@ func mergeBuckets(a, b []*dto.Bucket) []*dto.Bucket {
return output
}

func makeTimestampMs() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}

func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
// Getting the metric timestamp or creating one when that metric is merged
// It will be used to cleanup old metrics that have not been merged lately
metricTimestamp := b.GetTimestampMs()
if metricTimestamp == 0 {
metricTimestamp = makeTimestampMs()
}

switch ty {
case dto.MetricType_COUNTER:
return &dto.Metric{
Label: a.Label,
Counter: &dto.Counter{
Value: float64ptr(*a.Counter.Value + *b.Counter.Value),
},
TimestampMs: &metricTimestamp,
}

case dto.MetricType_GAUGE:
Expand All @@ -97,6 +110,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
Gauge: &dto.Gauge{
Value: float64ptr(*a.Gauge.Value + *b.Gauge.Value),
},
TimestampMs: &metricTimestamp,
}

case dto.MetricType_HISTOGRAM:
Expand All @@ -107,6 +121,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
SampleSum: float64ptr(*a.Histogram.SampleSum + *b.Histogram.SampleSum),
Bucket: mergeBuckets(a.Histogram.Bucket, b.Histogram.Bucket),
},
TimestampMs: &metricTimestamp,
}

case dto.MetricType_UNTYPED:
Expand All @@ -115,6 +130,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
Untyped: &dto.Untyped{
Value: float64ptr(*a.Untyped.Value + *b.Untyped.Value),
},
TimestampMs: &metricTimestamp,
}

case dto.MetricType_SUMMARY:
Expand Down Expand Up @@ -164,13 +180,15 @@ func mergeFamily(a, b *dto.MetricFamily) (*dto.MetricFamily, error) {
}

type aggate struct {
timeToLive time.Duration
familiesLock sync.RWMutex
families map[string]*dto.MetricFamily
}

func newAggate() *aggate {
func newAggate(ttl time.Duration) *aggate {
return &aggate{
families: map[string]*dto.MetricFamily{},
timeToLive: ttl,
families: map[string]*dto.MetricFamily{},
}
}

Expand All @@ -196,6 +214,21 @@ func validateFamily(f *dto.MetricFamily) error {
return nil
}

func cleanupFamily(metrics []*dto.Metric, ttl int64) []*dto.Metric {
// CurrentTS for old metrics check
nowTS := makeTimestampMs()

// Iterating over metrics and filtering out the old, not recently merged ones
var updatedMetrics []*dto.Metric
for _, metric := range metrics {
if nowTS-metric.GetTimestampMs() <= ttl {
updatedMetrics = append(updatedMetrics, metric)
}
}

return updatedMetrics
}

func (a *aggate) parseAndMerge(r io.Reader) error {
var parser expfmt.TextParser
inFamilies, err := parser.TextToMetricFamilies(r)
Expand Down Expand Up @@ -240,11 +273,20 @@ func (a *aggate) handler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", string(contentType))
enc := expfmt.NewEncoder(w, contentType)

a.familiesLock.RLock()
defer a.familiesLock.RUnlock()
a.familiesLock.Lock()
defer a.familiesLock.Unlock()
metricNames := []string{}
for name := range a.families {
metricNames = append(metricNames, name)
// Cleaning up metrics that have not been merged for a while
a.families[name].Metric = cleanupFamily(a.families[name].GetMetric(), a.timeToLive.Milliseconds)

// Including only families that still have metrics to be scraped
if len(a.families[name].Metric) > 0 {
metricNames = append(metricNames, name)
} else {
// Remove the empty families
delete(a.families, name)
}
}
sort.Sort(sort.StringSlice(metricNames))

Expand All @@ -268,9 +310,10 @@ func main() {
listen := flag.String("listen", ":80", "Address and port to listen on.")
cors := flag.String("cors", "*", "The 'Access-Control-Allow-Origin' value to be returned.")
pushPath := flag.String("push-path", "/metrics/", "HTTP path to accept pushed metrics.")
timeToLive := flag.Duration("ttl", 3600*time.Second, "How long unmerged metrics will live, in milliseconds (default 1h)")
flag.Parse()

a := newAggate()
a := newAggate(*timeToLive)
http.HandleFunc("/metrics", a.handler)
http.HandleFunc("/-/healthy", handleHealthCheck)
http.HandleFunc("/-/ready", handleHealthCheck)
Expand Down
126 changes: 63 additions & 63 deletions cmd/prom-aggregation-gateway/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,124 +19,124 @@ gauge 42
counter 31
# HELP histogram A histogram
# TYPE histogram histogram
histogram_bucket{le="1"} 0
histogram_bucket{le="2"} 0
histogram_bucket{le="3"} 3
histogram_bucket{le="4"} 4
histogram_bucket{le="5"} 4
histogram_bucket{le="6"} 4
histogram_bucket{le="7"} 4
histogram_bucket{le="8"} 4
histogram_bucket{le="9"} 4
histogram_bucket{le="10"} 4
histogram_bucket{le="+Inf"} 4
histogram_sum{} 2.5
histogram_count{} 1
histogram_bucket{le="1"} 0 %[1]d
histogram_bucket{le="2"} 0 %[1]d
histogram_bucket{le="3"} 3 %[1]d
histogram_bucket{le="4"} 4 %[1]d
histogram_bucket{le="5"} 4 %[1]d
histogram_bucket{le="6"} 4 %[1]d
histogram_bucket{le="7"} 4 %[1]d
histogram_bucket{le="8"} 4 %[1]d
histogram_bucket{le="9"} 4 %[1]d
histogram_bucket{le="10"} 4 %[1]d
histogram_bucket{le="+Inf"} 4 %[1]d
histogram_sum{} 2.5 %[1]d
histogram_count{} 1 %[1]d
`
in2 = `
# HELP gauge A gauge
# TYPE gauge gauge
gauge 57
gauge 57 %[1]d
# HELP counter A counter
# TYPE counter counter
counter 29
counter 29 %[1]d
# HELP histogram A histogram
# TYPE histogram histogram
histogram_bucket{le="1"} 0
histogram_bucket{le="2"} 0
histogram_bucket{le="3"} 0
histogram_bucket{le="4"} 4
histogram_bucket{le="5"} 5
histogram_bucket{le="6"} 5
histogram_bucket{le="7"} 5
histogram_bucket{le="8"} 5
histogram_bucket{le="9"} 5
histogram_bucket{le="10"} 5
histogram_bucket{le="+Inf"} 5
histogram_sum 4.5
histogram_count 1
histogram_bucket{le="1"} 0 %[1]d
histogram_bucket{le="2"} 0 %[1]d
histogram_bucket{le="3"} 0 %[1]d
histogram_bucket{le="4"} 4 %[1]d
histogram_bucket{le="5"} 5 %[1]d
histogram_bucket{le="6"} 5 %[1]d
histogram_bucket{le="7"} 5 %[1]d
histogram_bucket{le="8"} 5 %[1]d
histogram_bucket{le="9"} 5 %[1]d
histogram_bucket{le="10"} 5 %[1]d
histogram_bucket{le="+Inf"} 5 %[1]d
histogram_sum 4.5 %[1]d
histogram_count 1 %[1]d
`
want = `# HELP counter A counter
# TYPE counter counter
counter 60
counter 60 %[1]d
# HELP gauge A gauge
# TYPE gauge gauge
gauge 99
gauge 99 %[1]d
# HELP histogram A histogram
# TYPE histogram histogram
histogram_bucket{le="1"} 0
histogram_bucket{le="2"} 0
histogram_bucket{le="3"} 3
histogram_bucket{le="4"} 8
histogram_bucket{le="5"} 9
histogram_bucket{le="6"} 9
histogram_bucket{le="7"} 9
histogram_bucket{le="8"} 9
histogram_bucket{le="9"} 9
histogram_bucket{le="10"} 9
histogram_bucket{le="+Inf"} 9
histogram_sum 7
histogram_count 2
histogram_bucket{le="1"} 0 %[1]d
histogram_bucket{le="2"} 0 %[1]d
histogram_bucket{le="3"} 3 %[1]d
histogram_bucket{le="4"} 8 %[1]d
histogram_bucket{le="5"} 9 %[1]d
histogram_bucket{le="6"} 9 %[1]d
histogram_bucket{le="7"} 9 %[1]d
histogram_bucket{le="8"} 9 %[1]d
histogram_bucket{le="9"} 9 %[1]d
histogram_bucket{le="10"} 9 %[1]d
histogram_bucket{le="+Inf"} 9 %[1]d
histogram_sum 7 %[1]d
histogram_count 2 %[1]d
`

multilabel1 = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 1
counter{a="a",b="b"} 1 %[1]d
`
multilabel2 = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 2
counter{a="a",b="b"} 2 %[1]d
`
multilabelResult = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 3
counter{a="a",b="b"} 3 %[1]d
`
labelFields1 = `# HELP ui_page_render_errors A counter
# TYPE ui_page_render_errors counter
ui_page_render_errors{path="/org/:orgId"} 1
ui_page_render_errors{path="/prom/:orgId"} 1
ui_page_render_errors{path="/org/:orgId"} 1 %[1]d
ui_page_render_errors{path="/prom/:orgId"} 1 %[1]d
`
labelFields2 = `# HELP ui_page_render_errors A counter
# TYPE ui_page_render_errors counter
ui_page_render_errors{path="/prom/:orgId"} 1
ui_page_render_errors{path="/prom/:orgId"} 1 %[1]d
`
labelFieldResult = `# HELP ui_page_render_errors A counter
# TYPE ui_page_render_errors counter
ui_page_render_errors{path="/org/:orgId"} 1
ui_page_render_errors{path="/prom/:orgId"} 2
ui_page_render_errors{path="/org/:orgId"} 1 %[1]d
ui_page_render_errors{path="/prom/:orgId"} 2 %[1]d
`
gaugeInput = `
# HELP ui_external_lib_loaded A gauge with entries in un-sorted order
# TYPE ui_external_lib_loaded gauge
ui_external_lib_loaded{name="ga",loaded="true"} 1
ui_external_lib_loaded{name="Intercom",loaded="true"} 1
ui_external_lib_loaded{name="mixpanel",loaded="true"} 1
ui_external_lib_loaded{name="ga",loaded="true"} 1 %[1]d
ui_external_lib_loaded{name="Intercom",loaded="true"} 1 %[1]d
ui_external_lib_loaded{name="mixpanel",loaded="true"} 1 %[1]d
`
gaugeOutput = `# HELP ui_external_lib_loaded A gauge with entries in un-sorted order
# TYPE ui_external_lib_loaded gauge
ui_external_lib_loaded{loaded="true",name="Intercom"} 2
ui_external_lib_loaded{loaded="true",name="ga"} 2
ui_external_lib_loaded{loaded="true",name="mixpanel"} 2
ui_external_lib_loaded{loaded="true",name="Intercom"} 2 %[1]d
ui_external_lib_loaded{loaded="true",name="ga"} 2 %[1]d
ui_external_lib_loaded{loaded="true",name="mixpanel"} 2 %[1]d
`
duplicateLabels = `
# HELP ui_external_lib_loaded Test with duplicate values
# TYPE ui_external_lib_loaded gauge
ui_external_lib_loaded{name="Munchkin",loaded="true"} 15171
ui_external_lib_loaded{name="Munchkin",loaded="true"} 1
ui_external_lib_loaded{name="Munchkin",loaded="true"} 15171 %[1]d
ui_external_lib_loaded{name="Munchkin",loaded="true"} 1 %[1]d
`
duplicateError = `Duplicate labels: {__name__="ui_external_lib_loaded", loaded="true", name="Munchkin"}`

reorderedLabels1 = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 1
counter{a="a",b="b"} 1 %[1]d
`
reorderedLabels2 = `# HELP counter A counter
# TYPE counter counter
counter{b="b",a="a"} 2
counter{b="b",a="a"} 2 %[1]d
`
reorderedLabelsResult = `# HELP counter A counter
# TYPE counter counter
counter{a="a",b="b"} 3
counter{a="a",b="b"} 3 %[1]d
`
)

Expand All @@ -154,7 +154,7 @@ func TestAggate(t *testing.T) {
{duplicateLabels, "", "", fmt.Errorf("%s", duplicateError), nil},
{reorderedLabels1, reorderedLabels2, reorderedLabelsResult, nil, nil},
} {
a := newAggate()
a := newAggate(3600000)

if err := a.parseAndMerge(strings.NewReader(c.a)); err != nil {
if c.err1 == nil {
Expand Down

0 comments on commit ec64fb9

Please sign in to comment.