From a11a2f8fa778748a83798a066ebbc78585c6aae2 Mon Sep 17 00:00:00 2001 From: Kevin Burke Date: Thu, 12 Sep 2024 13:06:08 -0700 Subject: [PATCH] stdoutstats: new package This makes it easier to troubleshoot problems with the metrics pipeline, by visualizing what metrics are emitted. Fix the existing go_version and stats_version reporter to ensure that they work even if metrics are reported with ReportAt instead of Incr, Set, Observe etc. --- README.md | 23 ++++- engine.go | 143 +++++++++++++++++--------------- example_test.go | 20 +++++ httpstats/handler_test.go | 9 +- procstats/proc.go | 2 +- stdoutstats/stdoutstats.go | 113 +++++++++++++++++++++++++ stdoutstats/stdoutstats_test.go | 48 +++++++++++ 7 files changed, 284 insertions(+), 74 deletions(-) create mode 100644 example_test.go create mode 100644 stdoutstats/stdoutstats.go create mode 100644 stdoutstats/stdoutstats_test.go diff --git a/README.md b/README.md index 7f0b0a8..b6798dd 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Installation go get github.com/segmentio/stats/v5 ``` -Migration to v5 +Migration to v4/v5 --------------- Version 4 of the stats package introduced a new way of producing metrics based @@ -47,7 +47,7 @@ To avoid greatly increasing the complexity of the codebase some old APIs were removed in favor of this new approach, other were transformed to provide more flexibility and leverage new features. -The stats package used to only support float values, metrics can now be of +The stats package used to only support float values. Metrics can now be of various numeric types (see stats.MakeMeasure for a detailed description), therefore functions like `stats.Add` now accept an `interface{}` value instead of `float64`. `stats.ObserveDuration` was also removed since this new approach @@ -176,6 +176,23 @@ func main() { } ``` +### Troubleshooting + +Use the `stdoutstats` package to print all stats to the console. + +```go +handler := stdoutstats.Client{Dst: os.Stdout} +engine := stats.NewEngine("engine-name", handler) +engine.Incr("server.start") +``` + +You can use the `Grep` property to filter the printed metrics for only ones you +care about: + +```go +handler := stdoutstats.Client{Dst: os.Stdout, Grep: regexp.MustCompile("server.start")} +``` + Monitoring ---------- @@ -300,7 +317,7 @@ func main() { ``` You can also modify the default HTTP client to automatically get metrics for all -packages using it, this is very convinient to get insights into dependencies. +packages using it, this is very convenient to get insights into dependencies. ```go package main diff --git a/engine.go b/engine.go index 35d4ac2..0cae67d 100644 --- a/engine.go +++ b/engine.go @@ -60,86 +60,86 @@ func NewEngine(prefix string, handler Handler, tags ...Tag) *Engine { return e } -// Register adds handler to eng. -func (eng *Engine) Register(handler Handler) { - if eng.Handler == Discard { - eng.Handler = handler +// Register adds handler to e. +func (e *Engine) Register(handler Handler) { + if e.Handler == Discard { + e.Handler = handler } else { - eng.Handler = MultiHandler(eng.Handler, handler) + e.Handler = MultiHandler(e.Handler, handler) } } // Flush flushes eng's handler (if it implements the Flusher interface). -func (eng *Engine) Flush() { - flush(eng.Handler) +func (e *Engine) Flush() { + flush(e.Handler) } // WithPrefix returns a copy of the engine with prefix appended to eng's current // prefix and tags set to the merge of eng's current tags and those passed as // argument. Both eng and the returned engine share the same handler. -func (eng *Engine) WithPrefix(prefix string, tags ...Tag) *Engine { +func (e *Engine) WithPrefix(prefix string, tags ...Tag) *Engine { return &Engine{ - Handler: eng.Handler, - Prefix: eng.makeName(prefix), - Tags: mergeTags(eng.Tags, tags), + Handler: e.Handler, + Prefix: e.makeName(prefix), + Tags: mergeTags(e.Tags, tags), } } // WithTags returns a copy of the engine with tags set to the merge of eng's // current tags and those passed as arguments. Both eng and the returned engine // share the same handler. -func (eng *Engine) WithTags(tags ...Tag) *Engine { - return eng.WithPrefix("", tags...) +func (e *Engine) WithTags(tags ...Tag) *Engine { + return e.WithPrefix("", tags...) } // Incr increments by one the counter identified by name and tags. -func (eng *Engine) Incr(name string, tags ...Tag) { - eng.Add(name, 1, tags...) +func (e *Engine) Incr(name string, tags ...Tag) { + e.Add(name, 1, tags...) } // IncrAt increments by one the counter identified by name and tags. -func (eng *Engine) IncrAt(time time.Time, name string, tags ...Tag) { - eng.AddAt(time, name, 1, tags...) +func (e *Engine) IncrAt(time time.Time, name string, tags ...Tag) { + e.AddAt(time, name, 1, tags...) } // Add increments by value the counter identified by name and tags. -func (eng *Engine) Add(name string, value interface{}, tags ...Tag) { - eng.measure(time.Now(), name, value, Counter, tags...) +func (e *Engine) Add(name string, value interface{}, tags ...Tag) { + e.measure(time.Now(), name, value, Counter, tags...) } // AddAt increments by value the counter identified by name and tags. -func (eng *Engine) AddAt(t time.Time, name string, value interface{}, tags ...Tag) { - eng.measure(t, name, value, Counter, tags...) +func (e *Engine) AddAt(t time.Time, name string, value interface{}, tags ...Tag) { + e.measure(t, name, value, Counter, tags...) } // Set sets to value the gauge identified by name and tags. -func (eng *Engine) Set(name string, value interface{}, tags ...Tag) { - eng.measure(time.Now(), name, value, Gauge, tags...) +func (e *Engine) Set(name string, value interface{}, tags ...Tag) { + e.measure(time.Now(), name, value, Gauge, tags...) } // SetAt sets to value the gauge identified by name and tags. -func (eng *Engine) SetAt(t time.Time, name string, value interface{}, tags ...Tag) { - eng.measure(t, name, value, Gauge, tags...) +func (e *Engine) SetAt(t time.Time, name string, value interface{}, tags ...Tag) { + e.measure(t, name, value, Gauge, tags...) } // Observe reports value for the histogram identified by name and tags. -func (eng *Engine) Observe(name string, value interface{}, tags ...Tag) { - eng.measure(time.Now(), name, value, Histogram, tags...) +func (e *Engine) Observe(name string, value interface{}, tags ...Tag) { + e.measure(time.Now(), name, value, Histogram, tags...) } // ObserveAt reports value for the histogram identified by name and tags. -func (eng *Engine) ObserveAt(t time.Time, name string, value interface{}, tags ...Tag) { - eng.measure(t, name, value, Histogram, tags...) +func (e *Engine) ObserveAt(t time.Time, name string, value interface{}, tags ...Tag) { + e.measure(t, name, value, Histogram, tags...) } // Clock returns a new clock identified by name and tags. -func (eng *Engine) Clock(name string, tags ...Tag) *Clock { - return eng.ClockAt(name, time.Now(), tags...) +func (e *Engine) Clock(name string, tags ...Tag) *Clock { + return e.ClockAt(name, time.Now(), tags...) } // ClockAt returns a new clock identified by name and tags with a specified // start time. -func (eng *Engine) ClockAt(name string, start time.Time, tags ...Tag) *Clock { +func (e *Engine) ClockAt(name string, start time.Time, tags ...Tag) *Clock { cpy := make([]Tag, len(tags), len(tags)+1) // clock always appends a stamp. copy(cpy, tags) return &Clock{ @@ -147,7 +147,7 @@ func (eng *Engine) ClockAt(name string, start time.Time, tags ...Tag) *Clock { first: start, last: start, tags: cpy, - eng: eng, + eng: e, } } @@ -161,38 +161,48 @@ var truthyValues = map[string]bool{ var GoVersionReportingEnabled = !truthyValues[os.Getenv("STATS_DISABLE_GO_VERSION_REPORTING")] -func (eng *Engine) measure(t time.Time, name string, value interface{}, ftype FieldType, tags ...Tag) { - if GoVersionReportingEnabled { - eng.once.Do(func() { - vsn := strings.TrimPrefix(runtime.Version(), "go") - parts := strings.Split(vsn, ".") - // this filters out weird compiled Go versions like tip. len(parts) - // may equal 2 because older Go version might be "go1.13" - if len(parts) == 2 || len(parts) == 3 { - eng.Handler.HandleMeasures(t, Measure{ +func (e *Engine) reportVersionOnce(t time.Time) { + if !GoVersionReportingEnabled { + return + } + // We can't do this when we create the engine because it's possible to + // configure it after creation time with e.g. the Register function. So + // instead we try to do it at the moment you try to send your first metric. + e.once.Do(func() { + vsn := strings.TrimPrefix(runtime.Version(), "go") + parts := strings.Split(vsn, ".") + // We don't want to report weird compiled Go versions like tip. + // len(parts) may equal 2 because an older Go version might be "go1.13" + // instead of "go1.13.1" + if len(parts) == 2 || len(parts) == 3 { + e.Handler.HandleMeasures(t, + Measure{ Name: "go_version", Fields: []Field{{ - Name: "go_version", + Name: "value", Value: intValue(1), }}, Tags: []Tag{ {"go_version", vsn}, }, }, - Measure{ - Name: "stats_version", - Fields: []Field{{ - Name: "stats_version", - Value: intValue(1), - }}, - Tags: []Tag{ - {"stats_version", version.Version}, - }, + Measure{ + Name: "stats_version", + Fields: []Field{{ + Name: "value", + Value: intValue(1), + }}, + Tags: []Tag{ + {"stats_version", version.Version}, }, - ) - } - }) - } + }, + ) + } + }) +} + +func (eng *Engine) measure(t time.Time, name string, value interface{}, ftype FieldType, tags ...Tag) { + eng.reportVersionOnce(t) eng.measureOne(t, name, value, ftype, tags...) } @@ -224,8 +234,8 @@ func (eng *Engine) measureOne(t time.Time, name string, value interface{}, ftype measureArrayPool.Put(mp) } -func (eng *Engine) makeName(name string) string { - return concat(eng.Prefix, name) +func (e *Engine) makeName(name string) string { + return concat(e.Prefix, name) } var measureArrayPool = sync.Pool{ @@ -233,34 +243,35 @@ var measureArrayPool = sync.Pool{ } // Report calls ReportAt with time.Now() as first argument. -func (eng *Engine) Report(metrics interface{}, tags ...Tag) { - eng.ReportAt(time.Now(), metrics, tags...) +func (e *Engine) Report(metrics interface{}, tags ...Tag) { + e.ReportAt(time.Now(), metrics, tags...) } // ReportAt reports a set of metrics for a given time. The metrics must be of // type struct, pointer to struct, or a slice or array to one of those. See // MakeMeasures for details about how to make struct types exposing metrics. -func (eng *Engine) ReportAt(time time.Time, metrics interface{}, tags ...Tag) { +func (e *Engine) ReportAt(t time.Time, metrics interface{}, tags ...Tag) { + e.reportVersionOnce(t) var tb *tagsBuffer if len(tags) == 0 { // fast path for the common case where there are no dynamic tags - tags = eng.Tags + tags = e.Tags } else { tb = tagsPool.Get().(*tagsBuffer) tb.append(tags...) - tb.append(eng.Tags...) - if !eng.AllowDuplicateTags { + tb.append(e.Tags...) + if !e.AllowDuplicateTags { tb.sort() } tags = tb.tags } mb := measurePool.Get().(*measuresBuffer) - mb.measures = appendMeasures(mb.measures[:0], &eng.cache, eng.Prefix, reflect.ValueOf(metrics), tags...) + mb.measures = appendMeasures(mb.measures[:0], &e.cache, e.Prefix, reflect.ValueOf(metrics), tags...) ms := mb.measures - eng.Handler.HandleMeasures(time, ms...) + e.Handler.HandleMeasures(t, ms...) for i := range ms { ms[i].reset() diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..c6a4e2e --- /dev/null +++ b/example_test.go @@ -0,0 +1,20 @@ +package stats_test + +import ( + "os" + + stats "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/stdoutstats" +) + +func Example() { + handler := &stdoutstats.Client{Dst: os.Stdout} + engine := stats.NewEngine("engine-name", handler) + // Will print: + // + // 2024-12-18T14:53:57-08:00 engine-name.server.start:1|c + // + // to the console. + engine.Incr("server.start") + engine.Flush() +} diff --git a/httpstats/handler_test.go b/httpstats/handler_test.go index 87f6588..a0717c3 100644 --- a/httpstats/handler_test.go +++ b/httpstats/handler_test.go @@ -31,14 +31,15 @@ func TestHandler(t *testing.T) { io.ReadAll(res.Body) res.Body.Close() + e.Flush() measures := h.Measures() if len(measures) == 0 { t.Error("no measures reported by http handler") } + tagSeen := false for _, m := range measures { - tagSeen := false for _, tag := range m.Tags { if tag.Name == "bucket" { switch tag.Value { @@ -54,9 +55,9 @@ func TestHandler(t *testing.T) { } } } - if !tagSeen { - t.Errorf("did not see user-added tag for wrapped request. tags: %#v\n%#v", m, m.Tags) - } + } + if !tagSeen { + t.Errorf("did not see user-added tag for wrapped request. measures: %#v", measures) } for _, m := range measures { diff --git a/procstats/proc.go b/procstats/proc.go index ff37e67..92beb6f 100644 --- a/procstats/proc.go +++ b/procstats/proc.go @@ -40,7 +40,7 @@ type procCPU struct { type procMemory struct { // Memory - available uint64 `metric:"available.bytes" type:"gauge"` // amound of RAM available to the process + available uint64 `metric:"available.bytes" type:"gauge"` // amount of RAM available to the process size uint64 `metric:"total.bytes" type:"gauge"` // total program memory (including virtual mappings) resident struct { // resident set size diff --git a/stdoutstats/stdoutstats.go b/stdoutstats/stdoutstats.go new file mode 100644 index 0000000..122d939 --- /dev/null +++ b/stdoutstats/stdoutstats.go @@ -0,0 +1,113 @@ +package stdoutstats + +import ( + "fmt" + "io" + "math" + "os" + "regexp" + "strconv" + "time" + + "github.com/segmentio/stats/v5" +) + +// Client will print out received metrics. If Dst is nil, metrics will be +// printed to stdout, otherwise they will be printed to Dst. +// +// You can optionally provide a Grep regexp to limit printed metrics to ones +// matching the regular expression. +type Client struct { + Dst io.Writer + Grep *regexp.Regexp +} + +func (c *Client) Write(p []byte) (int, error) { + if c.Dst == nil { + return os.Stdout.Write(p) + } + return c.Dst.Write(p) +} + +func normalizeFloat(f float64) float64 { + switch { + case math.IsNaN(f): + return 0.0 + case math.IsInf(f, +1): + return +math.MaxFloat64 + case math.IsInf(f, -1): + return -math.MaxFloat64 + default: + return f + } +} +func appendMeasure(b []byte, m stats.Measure) []byte { + for _, field := range m.Fields { + b = append(b, m.Name...) + if len(field.Name) != 0 { + b = append(b, '.') + b = append(b, field.Name...) + } + b = append(b, ':') + + switch v := field.Value; v.Type() { + case stats.Bool: + if v.Bool() { + b = append(b, '1') + } else { + b = append(b, '0') + } + case stats.Int: + b = strconv.AppendInt(b, v.Int(), 10) + case stats.Uint: + b = strconv.AppendUint(b, v.Uint(), 10) + case stats.Float: + b = strconv.AppendFloat(b, normalizeFloat(v.Float()), 'g', -1, 64) + case stats.Duration: + b = strconv.AppendFloat(b, v.Duration().Seconds(), 'g', -1, 64) + default: + b = append(b, '0') + } + + switch field.Type() { + case stats.Counter: + b = append(b, '|', 'c') + case stats.Gauge: + b = append(b, '|', 'g') + default: + b = append(b, '|', 'd') + } + + if n := len(m.Tags); n != 0 { + b = append(b, '|', '#') + + for i, t := range m.Tags { + if i != 0 { + b = append(b, ',') + } + b = append(b, t.Name...) + b = append(b, ':') + b = append(b, t.Value...) + } + } + + b = append(b, '\n') + } + + return b +} + +func (c *Client) HandleMeasures(t time.Time, measures ...stats.Measure) { + for i := range measures { + m := &measures[i] + + // Process and output the measure + out := make([]byte, 0) + out = appendMeasure(out, *m) + if c.Grep != nil && !c.Grep.Match(out) { + continue // Skip this measure + } + + fmt.Fprintf(c, "%s %s", t.Format(time.RFC3339), out) + } +} diff --git a/stdoutstats/stdoutstats_test.go b/stdoutstats/stdoutstats_test.go new file mode 100644 index 0000000..ba192ea --- /dev/null +++ b/stdoutstats/stdoutstats_test.go @@ -0,0 +1,48 @@ +package stdoutstats + +import ( + "bytes" + "regexp" + "strings" + "testing" + + "github.com/segmentio/stats/v5" +) + +func TestStdout(t *testing.T) { + var buf bytes.Buffer + s := &Client{Dst: &buf} + stats.Register(s) + stats.Set("blah", 7) + stats.Observe("compression_ratio", 0.3, stats.T("file_size_bucket", "bucket_name"), stats.T("algorithm", "jwt256")) + bufstr := buf.String() + want := "stdoutstats.test.compression_ratio:0.3|d|#algorithm:jwt256,file_size_bucket:bucket_name\n" + if !strings.HasSuffix(bufstr, want) { + t.Errorf("stdoutstats: got %v want %v", bufstr, want) + } +} + +func TestStdoutGrepMatch(t *testing.T) { + var buf bytes.Buffer + s := &Client{ + Dst: &buf, + Grep: regexp.MustCompile(`compression_ratio`), + } + eng := stats.NewEngine("prefix", s) + + // Send measures that match and don't match the Grep pattern + eng.Set("compression_ratio", 0.3) + eng.Set("other_metric", 42) + eng.Flush() + + bufstr := buf.String() + + // Check that only the matching measure is output + if !strings.Contains(bufstr, "compression_ratio:0.3") { + t.Errorf("stdoutstats: expected output to contain 'compression_ratio:0.3', but it did not. Output: %s", bufstr) + } + + if strings.Contains(bufstr, "other_metric") { + t.Errorf("stdoutstats: expected output not to contain 'other_metric', but it did. Output: %s", bufstr) + } +}