Skip to content

Commit

Permalink
Merge branch 'main' into add-k6-benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
sontek authored Jan 2, 2023
2 parents a994323 + 1b7d0d9 commit 7c1e25b
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 38 deletions.
51 changes: 42 additions & 9 deletions aggregate.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package main

import (
"fmt"
"errors"
"io"
"log"
"net/http"
Expand Down Expand Up @@ -109,7 +109,7 @@ func (a *aggregate) saveFamily(familyName string, family *dto.MetricFamily) erro
return nil
}

func (a *aggregate) parseAndMerge(r io.Reader, job string) error {
func (a *aggregate) parseAndMerge(r io.Reader, labels []labelPair) error {
var parser expfmt.TextParser
inFamilies, err := parser.TextToMetricFamilies(r)
if err != nil {
Expand All @@ -119,7 +119,7 @@ func (a *aggregate) parseAndMerge(r io.Reader, job string) error {
for name, family := range inFamilies {
// Sort labels in case source sends them inconsistently
for _, m := range family.Metric {
a.formatLabels(m, job)
a.formatLabels(m, labels)
}

if err := validateFamily(family); err != nil {
Expand Down Expand Up @@ -190,21 +190,54 @@ func (a *aggregate) encodeMetric(name string, enc expfmt.Encoder) bool {
return false
}

var ErrOddNumberOfLabelParts = errors.New("labels must be defined in pairs")

func (a *aggregate) handleInsert(c *gin.Context) {
job := c.Param("job")
// TODO: add logic to verify correct format of job label
if job == "" {
err := fmt.Errorf("must send in a valid job name, sent: %s", job)
labelParts, jobName, err := parseLabelsInPath(c)
if err != nil {
log.Println(err)
http.Error(c.Writer, err.Error(), http.StatusBadRequest)
return
}

if err := a.parseAndMerge(c.Request.Body, job); err != nil {
if err := a.parseAndMerge(c.Request.Body, labelParts); err != nil {
log.Println(err)
http.Error(c.Writer, err.Error(), http.StatusBadRequest)
return
}

MetricPushes.WithLabelValues(job).Inc()
MetricPushes.WithLabelValues(jobName).Inc()
c.Status(http.StatusAccepted)
}

type labelPair struct {
name, value string
}

func parseLabelsInPath(c *gin.Context) ([]labelPair, string, error) {
labelString := c.Param("labels")
labelString = strings.Trim(labelString, "/")
if labelString == "" {
return nil, "", nil
}

labelParts := strings.Split(labelString, "/")
if len(labelParts)%2 != 0 {
return nil, "", ErrOddNumberOfLabelParts
}

var (
labelPairs []labelPair
jobName string
)
for idx := 0; idx < len(labelParts); idx += 2 {
name := labelParts[idx]
value := labelParts[idx+1]
labelPairs = append(labelPairs, labelPair{name, value})
if name == "job" {
jobName = value
}
}

return labelPairs, jobName, nil
}
20 changes: 12 additions & 8 deletions aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ counter{a="a",b="b",job="test"} 3
`
)

var testLabels = []labelPair{
{"job", "test"},
}

func TestAggregate(t *testing.T) {
for _, c := range []struct {
testName string
Expand All @@ -174,12 +178,12 @@ func TestAggregate(t *testing.T) {
} {
t.Run(c.testName, func(t *testing.T) {
agg := newAggregate(AddIgnoredLabels(c.ignoredLabels...))
router := setupAPIRouter("*", agg, metrics.Config{ Registry: prometheus.NewRegistry()})
router := setupAPIRouter("*", agg, metrics.Config{Registry: prometheus.NewRegistry()})

err := agg.parseAndMerge(strings.NewReader(c.a), "test")
err := agg.parseAndMerge(strings.NewReader(c.a), testLabels)
require.NoError(t, err)

err = agg.parseAndMerge(strings.NewReader(c.b), "test")
err = agg.parseAndMerge(strings.NewReader(c.b), testLabels)
require.NoError(t, err)

w := httptest.NewRecorder()
Expand All @@ -203,7 +207,7 @@ func TestAggregate(t *testing.T) {
t.Run("duplicateLabels", func(t *testing.T) {
agg := newAggregate()

err := agg.parseAndMerge(strings.NewReader(duplicateLabels), "test")
err := agg.parseAndMerge(strings.NewReader(duplicateLabels), testLabels)
require.Equal(t, err.Error(), duplicateError)
})
}
Expand All @@ -228,10 +232,10 @@ func BenchmarkAggregate(b *testing.B) {
a.options.ignoredLabels = v.ignoredLabels
b.Run(fmt.Sprintf("metric_type_%s", v.inputName), func(b *testing.B) {
for n := 0; n < b.N; n++ {
if err := a.parseAndMerge(strings.NewReader(v.input1), "test"); err != nil {
if err := a.parseAndMerge(strings.NewReader(v.input1), testLabels); err != nil {
b.Fatalf("unexpected error %s", err)
}
if err := a.parseAndMerge(strings.NewReader(v.input2), "test"); err != nil {
if err := a.parseAndMerge(strings.NewReader(v.input2), testLabels); err != nil {
b.Fatalf("unexpected error %s", err)
}
}
Expand All @@ -244,15 +248,15 @@ func BenchmarkConcurrentAggregate(b *testing.B) {
for _, v := range testMetricTable {
a.options.ignoredLabels = v.ignoredLabels
b.Run(fmt.Sprintf("metric_type_%s", v.inputName), func(b *testing.B) {
if err := a.parseAndMerge(strings.NewReader(v.input1), "test"); err != nil {
if err := a.parseAndMerge(strings.NewReader(v.input1), testLabels); err != nil {
b.Fatalf("unexpected error %s", err)
}

for n := 0; n < b.N; n++ {
g, _ := errgroup.WithContext(context.Background())
for tN := 0; tN < 10; tN++ {
g.Go(func() error {
return a.parseAndMerge(strings.NewReader(v.input2), "test")
return a.parseAndMerge(strings.NewReader(v.input2), testLabels)
})
}

Expand Down
20 changes: 6 additions & 14 deletions labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,17 @@ import (

func strPtr(s string) *string {
return &s

}

var JobLabel = strPtr("job")

func addJobLabel(m *dto.Metric, job string) {
if len(m.Label) > 0 {
for _, l := range m.Label {
if l.GetName() == "job" {
l.Value = strPtr(job)
return
}
}
func addLabels(m *dto.Metric, labels []labelPair) {
for _, label := range labels {
pair := dto.LabelPair{Name: strPtr(label.name), Value: strPtr(label.value)}
m.Label = append(m.Label, &pair)
}
m.Label = append(m.Label, &dto.LabelPair{Name: JobLabel, Value: strPtr(job)})
}

func (a *aggregate) formatLabels(m *dto.Metric, job string) {
addJobLabel(m, job)
func (a *aggregate) formatLabels(m *dto.Metric, labels []labelPair) {
addLabels(m, labels)
sort.Sort(byName(m.Label))

if len(a.options.ignoredLabels) > 0 {
Expand Down
7 changes: 4 additions & 3 deletions labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ func TestFormatLabels(t *testing.T) {
{},
},
}
a.formatLabels(m, "test")
a.formatLabels(m, []labelPair{{"job", "test"}, {"thing3", "value3"}})

assert.Equal(t, &dto.LabelPair{Name: strPtr("job"), Value: strPtr("test")}, m.Label[0])
assert.Equal(t, &dto.LabelPair{Name: strPtr("thing1"), Value: strPtr("value1")}, m.Label[1])
assert.Equal(t, &dto.LabelPair{Name: strPtr("thing2"), Value: strPtr("value2")}, m.Label[2])
assert.Len(t, m.Label, 3)
assert.Equal(t, &dto.LabelPair{Name: strPtr("thing3"), Value: strPtr("value3")}, m.Label[3])
assert.Len(t, m.Label, 4)

}

Expand Down Expand Up @@ -84,7 +85,7 @@ func BenchmarkFormatLabels(b *testing.B) {
a := newAggregate(AddIgnoredLabels(v.ignoredLabels...))
b.Run(fmt.Sprintf("metric_type_%s", v.inputName), func(b *testing.B) {
for n := 0; n < b.N; n++ {
a.formatLabels(v.m, "test")
a.formatLabels(v.m, testLabels)
}
})
}
Expand Down
15 changes: 11 additions & 4 deletions publicRouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@ func setupAPIRouter(corsDomain string, agg *aggregate, promConfig metrics.Config
})

r := gin.New()
r.RedirectTrailingSlash = false

r.GET("/metrics",
mGin.Handler("metrics", metricsMiddleware),
mGin.Handler("getMetrics", metricsMiddleware),
cors.New(corsConfig),
agg.handleRender)
r.POST("/metrics/job/:job",
mGin.Handler("/metrics/job", metricsMiddleware),
agg.handleInsert)

insertHandler := mGin.Handler("postMetrics", metricsMiddleware)
insertMethods := []func(string, ...gin.HandlerFunc) gin.IRoutes{r.POST, r.PUT}
insertPaths := []string{"/metrics", "/metrics/*labels"}
for _, method := range insertMethods {
for _, path := range insertPaths {
method(path, insertHandler, agg.handleInsert)
}
}

return r
}
84 changes: 84 additions & 0 deletions router_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
metrics "github.com/slok/go-http-metrics/metrics/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -31,3 +35,83 @@ func TestHealthCheck(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, true, response.IsAlive)
}

func TestMultiLabelPosting(t *testing.T) {
tests := []struct {
name string
path, metric string
expected string
}{
{
"multiple labels",
"/metrics/label1/value1/label2/value2",
`# TYPE some_counter counter
some_counter 1
`,
`# TYPE some_counter counter
some_counter{label1="value1",label2="value2"} 1
`},
{
"job label",
"/metrics/job/someJob",
`# TYPE some_counter counter
some_counter 1
`,
`# TYPE some_counter counter
some_counter{job="someJob"} 1
`,
},
{
"no labels, no trailing slash",
"/metrics",
"# TYPE some_counter counter\nsome_counter 1\n",
"# TYPE some_counter counter\nsome_counter 1\n",
},
{
"no labels, trailing slash",
"/metrics/",
"# TYPE some_counter counter\nsome_counter 1\n",
"# TYPE some_counter counter\nsome_counter 1\n",
},
{
"duplicate labels",
"/metrics/testing/one/testing/two/testing/three",
"# TYPE some_counter counter\n some_counter 1\n",
"# TYPE some_counter counter\nsome_counter{testing=\"one\",testing=\"two\",testing=\"three\"} 1\n",
},
}

for idx, test := range tests {
t.Run(fmt.Sprintf("test #%d: %s", idx+1, test), func(t *testing.T) {
// setup router
agg := newAggregate()
promConfig := metrics.Config{
Registry: prometheus.NewRegistry(),
}
router := setupAPIRouter("https://cors-domain", agg, promConfig)

// ---- insert metric ----
// setup request
buf := bytes.NewBufferString(test.metric)
req, err := http.NewRequest("PUT", test.path, buf)
require.NoError(t, err)

// make request
w := httptest.NewRecorder()
router.ServeHTTP(w, req)

assert.Equal(t, 202, w.Code)

// ---- retrieve metric ----
req, err = http.NewRequest("GET", "/metrics", nil)
require.NoError(t, err)

w = httptest.NewRecorder()
router.ServeHTTP(w, req)

assert.Equal(t, 200, w.Code)
body := w.Body.String()
assert.Equal(t, test.expected, body)
})
}
}

0 comments on commit 7c1e25b

Please sign in to comment.