Skip to content

Commit

Permalink
simplify part of code, add tests, make struct fields exported
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryk-dk committed Sep 4, 2023
1 parent 7bf2aca commit 7e22c61
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 51 deletions.
97 changes: 56 additions & 41 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,71 +11,69 @@ import (
"time"
)

// PushConfig is config for pushing registered metrics to the given pushURL with the given interval.
// PushConfig is config for pushing registered metrics to the given PushURL with the given Interval.
//
// PushURL and Interval are required fields
type PushConfig struct {
// headers contain optional http request headers
headers http.Header
// PushURL defines URL where metrics would be pushed.
PushURL string
// Interval determines the frequency of pushing metrics.
Interval time.Duration

// Headers contain optional http request Headers
Headers http.Header
// ExtraLabels may contain comma-separated list of `label="value"` labels, which will be added
// to all the metrics before pushing them to PushURL.
ExtraLabels string
// WriteMetrics defines the function to write metrics
WriteMetrics func(w io.Writer)

// pushURL defines URL where metrics would be pushed
pushURL *url.URL
// interval determines the frequency of pushing metrics
interval time.Duration
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
// to all the metrics before pushing them to pushURL.
extraLabels string
// writeMetrics defines the function to write metrics
writeMetrics func(w io.Writer)
}

func New(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer), headers http.Header) (*PushConfig, error) {
if interval <= 0 {
return nil, fmt.Errorf("interval must be positive; got %s", interval)
// Validate checks the defined fields and returns error if some of the field
// has incorrect value
func (pc *PushConfig) Validate() error {
if pc.Interval <= 0 {
return fmt.Errorf("interval must be positive; got %s", pc.Interval)
}
if err := validateTags(extraLabels); err != nil {
return nil, fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err)
if err := validateTags(pc.ExtraLabels); err != nil {
return fmt.Errorf("invalid ExtraLabels=%q: %w", pc.ExtraLabels, err)
}
pu, err := url.Parse(pushURL)
pu, err := parseURL(pc.PushURL)
if err != nil {
return nil, fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err)
}
if pu.Scheme != "http" && pu.Scheme != "https" {
return nil, fmt.Errorf("unsupported scheme in pushURL=%q; expecting 'http' or 'https'", pushURL)
}
if pu.Host == "" {
return nil, fmt.Errorf("missing host in pushURL=%q", pushURL)
return fmt.Errorf("field PushURL not valid: %w", err)
}

rp := &PushConfig{
headers: headers,
pushURL: pu,
interval: interval,
extraLabels: extraLabels,
writeMetrics: writeMetrics,
}
return rp, nil
pc.pushURL = pu
return nil
}

// Push run request to the defined pushURL every interval
// Push run request to the defined PushURL every Interval
func (pc *PushConfig) Push() {
pushURLRedacted := pc.pushURL.Redacted()

Check warning on line 54 in config.go

View check run for this annotation

Codecov / codecov/patch

config.go#L53-L54

Added lines #L53 - L54 were not covered by tests
// by default set interval to one second
if pc.Interval == 0 {
pc.Interval = time.Second

Check warning on line 57 in config.go

View check run for this annotation

Codecov / codecov/patch

config.go#L56-L57

Added lines #L56 - L57 were not covered by tests
}
cl := &http.Client{
Timeout: pc.interval,
Timeout: pc.Interval,

Check warning on line 60 in config.go

View check run for this annotation

Codecov / codecov/patch

config.go#L59-L60

Added lines #L59 - L60 were not covered by tests
}
pushesTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_total{url=%q}`, pushURLRedacted))
pushErrorsTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_errors_total{url=%q}`, pushURLRedacted))
bytesPushedTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_bytes_pushed_total{url=%q}`, pushURLRedacted))
pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted))
pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted))
pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(pc.interval.Seconds())
ticker := time.NewTicker(pc.interval)
pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(pc.Interval.Seconds())
ticker := time.NewTicker(pc.Interval)
var bb bytes.Buffer
var tmpBuf []byte
zw := gzip.NewWriter(&bb)
for range ticker.C {
bb.Reset()
pc.writeMetrics(&bb)
if len(pc.extraLabels) > 0 {
tmpBuf = addExtraLabels(tmpBuf[:0], bb.Bytes(), pc.extraLabels)
pc.WriteMetrics(&bb)
if len(pc.ExtraLabels) > 0 {
tmpBuf = addExtraLabels(tmpBuf[:0], bb.Bytes(), pc.ExtraLabels)
bb.Reset()
if _, err := bb.Write(tmpBuf); err != nil {
panic(fmt.Errorf("BUG: cannot write %d bytes to bytes.Buffer: %s", len(tmpBuf), err))

Check warning on line 79 in config.go

View check run for this annotation

Codecov / codecov/patch

config.go#L62-L79

Added lines #L62 - L79 were not covered by tests
Expand Down Expand Up @@ -122,12 +120,29 @@ func (pc *PushConfig) Push() {
}
}

// SetHeaders can be used to set defined headers to the http request
// SetHeaders can be used to set defined Headers to the http request
func (pc *PushConfig) SetHeaders(req *http.Request) {
reqHeaders := req.Header
for key, h := range pc.headers {
for key, h := range pc.Headers {
for _, s := range h {
reqHeaders.Add(key, s)

Check warning on line 128 in config.go

View check run for this annotation

Codecov / codecov/patch

config.go#L124-L128

Added lines #L124 - L128 were not covered by tests
}
}
}

func parseURL(u string) (*url.URL, error) {
if u == "" {
return nil, fmt.Errorf("url cannot br empty")
}
pu, err := url.Parse(u)
if err != nil {
return nil, fmt.Errorf("cannot parse url=%q: %w", u, err)

Check warning on line 139 in config.go

View check run for this annotation

Codecov / codecov/patch

config.go#L139

Added line #L139 was not covered by tests
}
if pu.Scheme != "http" && pu.Scheme != "https" {
return nil, fmt.Errorf("unsupported scheme in url=%q; expecting 'http' or 'https'", u)
}
if pu.Host == "" {
return nil, fmt.Errorf("missing host in url=%q", u)
}
return pu, nil
}
33 changes: 33 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package metrics

import (
"testing"
"time"
)

func TestPushConfigValidateError(t *testing.T) {
f := func(config *PushConfig) {
t.Helper()
if err := config.Validate(); err == nil {
t.Fatalf("expecting non-nil error when validating %v", config)
}
}

f(&PushConfig{})
f(&PushConfig{PushURL: "", Interval: time.Second})
f(&PushConfig{PushURL: "https://localhost:8080", Interval: -1 * time.Second})
f(&PushConfig{PushURL: "htt://localhost:8080", Interval: time.Second})
f(&PushConfig{PushURL: "http://localhost:8080", Interval: time.Second, ExtraLabels: "a{} "})
}

func TestPushConfigValidateSuccess(t *testing.T) {
f := func(config *PushConfig) {
t.Helper()
if err := config.Validate(); err != nil {
t.Fatalf("expecting nil error when validating %v; err: %s", config, err)
}
}

f(&PushConfig{PushURL: "http://localhost:8080", Interval: time.Second})
f(&PushConfig{PushURL: "http://localhost:8080", Interval: time.Second, ExtraLabels: `foo="bar"`})
}
42 changes: 32 additions & 10 deletions push.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,15 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr
//
// It is OK calling InitPushWithConfig multiple times with different pushURL -
// in this case metrics are pushed to all the provided pushURL urls.
func InitPushWithConfig(pushConfig *PushConfig, pushProcessMetrics bool) {
pushConfig.writeMetrics = func(w io.Writer) {
func InitPushWithConfig(pushConfig *PushConfig, pushProcessMetrics bool) error {
if err := pushConfig.Validate(); err != nil {
return err

Check warning on line 59 in push.go

View check run for this annotation

Codecov / codecov/patch

push.go#L57-L59

Added lines #L57 - L59 were not covered by tests
}
pushConfig.WriteMetrics = func(w io.Writer) {
WritePrometheus(w, pushProcessMetrics)

Check warning on line 62 in push.go

View check run for this annotation

Codecov / codecov/patch

push.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}
go pushConfig.Push()
return nil

Check warning on line 65 in push.go

View check run for this annotation

Codecov / codecov/patch

push.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}

// InitPush sets up periodic push for metrics from s to the given pushURL with the given interval.
Expand All @@ -81,16 +85,20 @@ func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels strin
return InitPushExt(pushURL, interval, extraLabels, writeMetrics)
}

// InitPushWithConfig sets up periodic push for globally registered metrics to the given pushURL with the given interval
// InitPushWithConfig sets up periodic push for globally registered metrics to the given PushURL with the given Interval
// defined in the PushConfig
//
// It is OK calling InitPushWithConfig multiple times with different pushURL -
// in this case metrics are pushed to all the provided pushURL urls.
func (s *Set) InitPushWithConfig(pushConfig *PushConfig) {
pushConfig.writeMetrics = func(w io.Writer) {
// It is OK calling InitPushWithConfig multiple times with different PushURL -
// in this case metrics are pushed to all the provided PushURL urls.
func (s *Set) InitPushWithConfig(pushConfig *PushConfig) error {
if err := pushConfig.Validate(); err != nil {
return err

Check warning on line 95 in push.go

View check run for this annotation

Codecov / codecov/patch

push.go#L93-L95

Added lines #L93 - L95 were not covered by tests
}
pushConfig.WriteMetrics = func(w io.Writer) {
s.WritePrometheus(w)

Check warning on line 98 in push.go

View check run for this annotation

Codecov / codecov/patch

push.go#L97-L98

Added lines #L97 - L98 were not covered by tests
}
go pushConfig.Push()
return nil

Check warning on line 101 in push.go

View check run for this annotation

Codecov / codecov/patch

push.go#L100-L101

Added lines #L100 - L101 were not covered by tests
}

// InitPushExt sets up periodic push for metrics obtained by calling writeMetrics with the given interval.
Expand All @@ -110,11 +118,25 @@ func (s *Set) InitPushWithConfig(pushConfig *PushConfig) {
// It is OK calling InitPushExt multiple times with different writeMetrics -
// in this case all the metrics generated by writeMetrics callbacks are written to pushURL.
func InitPushExt(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) error {
pc, err := New(pushURL, interval, extraLabels, writeMetrics, nil)
if err != nil {
pc := &PushConfig{
PushURL: pushURL,
Interval: interval,
ExtraLabels: extraLabels,
WriteMetrics: writeMetrics,
}
return InitPushExtWithConfig(pc)
}

// InitPushExtWithConfig sets up periodic push for metrics obtained by calling writeMetrics with the given interval
// defined in the PushConfig.
//
// It is OK calling InitPushExtWithConfig multiple times with different writeMetrics -
// in this case all the metrics generated by writeMetrics callbacks are written to PushURL.
func InitPushExtWithConfig(pushConfig *PushConfig) error {
if err := pushConfig.Validate(); err != nil {
return err
}
go pc.Push()
go pushConfig.Push()

Check warning on line 139 in push.go

View check run for this annotation

Codecov / codecov/patch

push.go#L139

Added line #L139 was not covered by tests
return nil
}

Expand Down

0 comments on commit 7e22c61

Please sign in to comment.