From 4299d7d4102cb688fe14b3e523f3d6ea3b46472d Mon Sep 17 00:00:00 2001 From: Samuel Attwood <45669855+samuelattwood@users.noreply.github.com> Date: Thu, 30 Nov 2023 01:32:04 -0500 Subject: [PATCH] Allow concurrent polling (#170) * Add flightgroup to Collect method * Fix issue with deferred Collects. Update test for concurrent access * Improve tests * Update variable name --- go.mod | 2 +- go.sum | 6 + surveyor/collector_statz.go | 397 ++++++++++++++++++++---------------- surveyor/surveyor.go | 24 +-- surveyor/surveyor_test.go | 60 +++++- 5 files changed, 282 insertions(+), 207 deletions(-) diff --git a/go.mod b/go.mod index 11619bd..f49c191 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/nats-io/nuid v1.0.1 github.com/prometheus/client_golang v1.17.0 github.com/prometheus/client_model v0.5.0 + github.com/prometheus/common v0.44.0 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.7.0 github.com/spf13/viper v1.16.0 @@ -37,7 +38,6 @@ require ( github.com/nats-io/nkeys v0.4.6 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.1 // indirect diff --git a/go.sum b/go.sum index dd13384..44a6337 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= +github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -113,6 +114,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -147,9 +149,11 @@ github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQs github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= @@ -187,6 +191,7 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= @@ -523,6 +528,7 @@ google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/surveyor/collector_statz.go b/surveyor/collector_statz.go index 3b8d738..e3fe705 100644 --- a/surveyor/collector_statz.go +++ b/surveyor/collector_statz.go @@ -26,6 +26,7 @@ import ( "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" ) // statzDescs holds the metric descriptions @@ -115,6 +116,7 @@ type statzDescs struct { // StatzCollector collects statz from a server deployment type StatzCollector struct { sync.Mutex + flightGroup singleflight.Group nc *nats.Conn logger *logrus.Logger start time.Time @@ -397,14 +399,6 @@ func NewStatzCollector(nc *nats.Conn, logger *logrus.Logger, numServers int, ser return sc } -// Polling determines if the collector is in a polling cycle -func (sc *StatzCollector) Polling() bool { - sc.Lock() - defer sc.Unlock() - - return sc.polling -} - func (sc *StatzCollector) handleResponse(msg *nats.Msg) { m := &server.ServerStatsMsg{} if err := json.Unmarshal(msg.Data, m); err != nil { @@ -416,7 +410,12 @@ func (sc *StatzCollector) handleResponse(msg *nats.Msg) { isCurrent := strings.HasSuffix(msg.Subject, sc.pollkey) rtt := time.Since(sc.start) if sc.polling && isCurrent { //nolint - sc.statsChan <- m + // Avoid edge-case deadlock if message comes in after poll() receive loop completes + // but before lock is re-acquired + select { + case <-time.After(500 * time.Millisecond): + case sc.statsChan <- m: + } sc.rtts[m.Server.ID] = rtt } else if !isCurrent { sc.logger.Infof("Late reply for server [%15s : %15s : %s]: %v", m.Server.Cluster, serverName(m), m.Server.ID, rtt) @@ -868,198 +867,250 @@ func (sc *StatzCollector) Describe(ch chan<- *prometheus.Desc) { sc.noReplies.Describe(ch) } -func newGaugeMetric(desc *prometheus.Desc, value float64, labels []string) prometheus.Metric { - return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labels...) +type metricSlice struct { + sync.Mutex + metrics []prometheus.Metric } -func newCounterMetric(desc *prometheus.Desc, value float64, labels []string) prometheus.Metric { - return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labels...) +func (ms *metricSlice) appendMetric(m prometheus.Metric) { + ms.Lock() + defer ms.Unlock() + ms.metrics = append(ms.metrics, m) } -func (sc *StatzCollector) newNatsUpGaugeMetric(value bool) prometheus.Metric { - var fval float64 - if value { - fval = 1 - } - return prometheus.MustNewConstMetric(sc.natsUp, prometheus.GaugeValue, fval) +func (ms *metricSlice) newGaugeMetric(desc *prometheus.Desc, value float64, labels []string) { + m := prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labels...) + + ms.appendMetric(m) +} + +func (ms *metricSlice) newCounterMetric(desc *prometheus.Desc, value float64, labels []string) { + m := prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labels...) + + ms.appendMetric(m) } // Collect gathers the streaming server serverz metrics. func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) { - timer := prometheus.NewTimer(sc.pollTime.WithLabelValues()) - defer func() { - timer.ObserveDuration() - sc.pollTime.Collect(ch) - sc.pollErrCnt.Collect(ch) - sc.surveyedCnt.Collect(ch) - sc.expectedCnt.Collect(ch) - sc.lateReplies.Collect(ch) - sc.noReplies.Collect(ch) - }() - - // poll the servers - if err := sc.poll(); err != nil { - sc.logger.Warnf("Error polling NATS server: %v", err) - sc.pollErrCnt.WithLabelValues().Inc() - ch <- sc.newNatsUpGaugeMetric(false) - return - } + // Run in flightgroup to allow simultaneous query + result, err, _ := sc.flightGroup.Do("collect", func() (interface{}, error) { + metrics := &metricSlice{ + metrics: make([]prometheus.Metric, 0), + Mutex: sync.Mutex{}, + } - // lock the stats - sc.Lock() - defer sc.Unlock() + timer := prometheus.NewTimer(sc.pollTime.WithLabelValues()) - ch <- sc.newNatsUpGaugeMetric(true) - sc.surveyedCnt.WithLabelValues().Set(0) - - for _, sm := range sc.stats { - sc.surveyedCnt.WithLabelValues().Inc() - - ch <- newGaugeMetric(sc.descs.Info, 1, sc.serverInfoLabelValues(sm)) - - labels := sc.serverLabelValues(sm) - ch <- newGaugeMetric(sc.descs.Start, float64(sm.Stats.Start.UnixNano()), labels) - ch <- newGaugeMetric(sc.descs.Uptime, time.Since(sm.Stats.Start).Seconds(), labels) - ch <- newGaugeMetric(sc.descs.Mem, float64(sm.Stats.Mem), labels) - ch <- newGaugeMetric(sc.descs.Cores, float64(sm.Stats.Cores), labels) - ch <- newGaugeMetric(sc.descs.CPU, sm.Stats.CPU, labels) - ch <- newGaugeMetric(sc.descs.Connections, float64(sm.Stats.Connections), labels) - ch <- newGaugeMetric(sc.descs.TotalConnections, float64(sm.Stats.TotalConnections), labels) - ch <- newGaugeMetric(sc.descs.ActiveAccounts, float64(sm.Stats.ActiveAccounts), labels) - ch <- newGaugeMetric(sc.descs.NumSubs, float64(sm.Stats.NumSubs), labels) - ch <- newGaugeMetric(sc.descs.SentMsgs, float64(sm.Stats.Sent.Msgs), labels) - ch <- newGaugeMetric(sc.descs.SentBytes, float64(sm.Stats.Sent.Bytes), labels) - ch <- newGaugeMetric(sc.descs.RecvMsgs, float64(sm.Stats.Received.Msgs), labels) - ch <- newGaugeMetric(sc.descs.RecvBytes, float64(sm.Stats.Received.Bytes), labels) - ch <- newGaugeMetric(sc.descs.SlowConsumers, float64(sm.Stats.SlowConsumers), labels) - ch <- newGaugeMetric(sc.descs.RTT, float64(sc.rtts[sm.Server.ID]), labels) - ch <- newGaugeMetric(sc.descs.Routes, float64(len(sm.Stats.Routes)), labels) - ch <- newGaugeMetric(sc.descs.Gateways, float64(len(sm.Stats.Gateways)), labels) - - ch <- newGaugeMetric(sc.descs.JetstreamInfo, float64(1), jetstreamInfoLabelValues(sm)) - // Any / All Meta-data in sc.descs.JetstreamInfo can be xrefed by the server_id. - // labels define the "uniqueness" of a time series, any associations beyond that should be left to prometheus - lblServerID := []string{sm.Server.ID, sm.Server.Name, sm.Server.Cluster} - if sm.Stats.JetStream == nil { - ch <- newGaugeMetric(sc.descs.JetstreamEnabled, float64(0), lblServerID) - } else { - ch <- newGaugeMetric(sc.descs.JetstreamEnabled, float64(1), lblServerID) - if sm.Stats.JetStream.Config != nil { - ch <- newGaugeMetric(sc.descs.JetstreamFilestoreSizeBytes, float64(sm.Stats.JetStream.Config.MaxStore), lblServerID) - ch <- newGaugeMetric(sc.descs.JetstreamMemstoreSizeBytes, float64(sm.Stats.JetStream.Config.MaxMemory), lblServerID) - // StoreDir At present, '$SYS.REQ.SERVER.PING', server.sendStatsz() squashes StoreDir to "". - // Domain is also at 'sm.Server.Domain'. Unknown if there's a semantic difference at present. See jsDomainLabelValue(). - } - if sm.Stats.JetStream.Stats != nil { - ch <- newGaugeMetric(sc.descs.JetstreamFilestoreUsedBytes, float64(sm.Stats.JetStream.Stats.Store), lblServerID) - ch <- newGaugeMetric(sc.descs.JetstreamFilestoreReservedBytes, float64(sm.Stats.JetStream.Stats.ReservedStore), lblServerID) - ch <- newGaugeMetric(sc.descs.JetstreamMemstoreUsedBytes, float64(sm.Stats.JetStream.Stats.Memory), lblServerID) - ch <- newGaugeMetric(sc.descs.JetstreamMemstoreReservedBytes, float64(sm.Stats.JetStream.Stats.ReservedMemory), lblServerID) - ch <- newGaugeMetric(sc.descs.JetstreamAccounts, float64(sm.Stats.JetStream.Stats.Accounts), lblServerID) - ch <- newGaugeMetric(sc.descs.JetstreamHAAssets, float64(sm.Stats.JetStream.Stats.HAAssets), lblServerID) - // NIT: Technically these should be Counters, not Gauges. - // At present, Total does not include Errors. Keeping them separate - ch <- newGaugeMetric(sc.descs.JetstreamAPIRequests, float64(sm.Stats.JetStream.Stats.API.Total), lblServerID) - ch <- newGaugeMetric(sc.descs.JetstreamAPIErrors, float64(sm.Stats.JetStream.Stats.API.Errors), lblServerID) - } + // poll the servers + if err := sc.poll(); err != nil { + sc.logger.Warnf("Error polling NATS server: %v", err) + sc.pollErrCnt.WithLabelValues().Inc() + metrics.newCounterMetric(sc.natsUp, 0, nil) + return metrics.metrics, nil + } - if sm.Stats.JetStream.Meta == nil { - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupInfo, float64(0), []string{"", "", sm.Server.ID, serverName(sm), "", ""}) + // lock the stats + sc.Lock() + defer sc.Unlock() + + metrics.newCounterMetric(sc.natsUp, 1, nil) + sc.surveyedCnt.WithLabelValues().Set(0) + + for _, sm := range sc.stats { + sc.surveyedCnt.WithLabelValues().Inc() + + metrics.newGaugeMetric(sc.descs.Info, 1, sc.serverInfoLabelValues(sm)) + + labels := sc.serverLabelValues(sm) + metrics.newGaugeMetric(sc.descs.Start, float64(sm.Stats.Start.UnixNano()), labels) + metrics.newGaugeMetric(sc.descs.Uptime, time.Since(sm.Stats.Start).Seconds(), labels) + metrics.newGaugeMetric(sc.descs.Mem, float64(sm.Stats.Mem), labels) + metrics.newGaugeMetric(sc.descs.Cores, float64(sm.Stats.Cores), labels) + metrics.newGaugeMetric(sc.descs.CPU, sm.Stats.CPU, labels) + metrics.newGaugeMetric(sc.descs.Connections, float64(sm.Stats.Connections), labels) + metrics.newGaugeMetric(sc.descs.TotalConnections, float64(sm.Stats.TotalConnections), labels) + metrics.newGaugeMetric(sc.descs.ActiveAccounts, float64(sm.Stats.ActiveAccounts), labels) + metrics.newGaugeMetric(sc.descs.NumSubs, float64(sm.Stats.NumSubs), labels) + metrics.newGaugeMetric(sc.descs.SentMsgs, float64(sm.Stats.Sent.Msgs), labels) + metrics.newGaugeMetric(sc.descs.SentBytes, float64(sm.Stats.Sent.Bytes), labels) + metrics.newGaugeMetric(sc.descs.RecvMsgs, float64(sm.Stats.Received.Msgs), labels) + metrics.newGaugeMetric(sc.descs.RecvBytes, float64(sm.Stats.Received.Bytes), labels) + metrics.newGaugeMetric(sc.descs.SlowConsumers, float64(sm.Stats.SlowConsumers), labels) + metrics.newGaugeMetric(sc.descs.RTT, float64(sc.rtts[sm.Server.ID]), labels) + metrics.newGaugeMetric(sc.descs.Routes, float64(len(sm.Stats.Routes)), labels) + metrics.newGaugeMetric(sc.descs.Gateways, float64(len(sm.Stats.Gateways)), labels) + + metrics.newGaugeMetric(sc.descs.JetstreamInfo, float64(1), jetstreamInfoLabelValues(sm)) + // Any / All Meta-data in sc.descs.JetstreamInfo can be xrefed by the server_id. + // labels define the "uniqueness" of a time series, any associations beyond that should be left to prometheus + lblServerID := []string{sm.Server.ID, sm.Server.Name, sm.Server.Cluster} + if sm.Stats.JetStream == nil { + metrics.newGaugeMetric(sc.descs.JetstreamEnabled, float64(0), lblServerID) } else { - jsRaftGroupInfoLabelValues := []string{jsDomainLabelValue(sm), "_meta_", sm.Server.ID, serverName(sm), sm.Stats.JetStream.Meta.Name, sm.Stats.JetStream.Meta.Leader} - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupInfo, float64(1), jsRaftGroupInfoLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamEnabled, float64(1), lblServerID) + if sm.Stats.JetStream.Config != nil { + metrics.newGaugeMetric(sc.descs.JetstreamFilestoreSizeBytes, float64(sm.Stats.JetStream.Config.MaxStore), lblServerID) + metrics.newGaugeMetric(sc.descs.JetstreamMemstoreSizeBytes, float64(sm.Stats.JetStream.Config.MaxMemory), lblServerID) + // StoreDir At present, '$SYS.REQ.SERVER.PING', server.sendStatsz() squashes StoreDir to "". + // Domain is also at 'sm.Server.Domain'. Unknown if there's a semantic difference at present. See jsDomainLabelValue(). + } + if sm.Stats.JetStream.Stats != nil { + metrics.newGaugeMetric(sc.descs.JetstreamFilestoreUsedBytes, float64(sm.Stats.JetStream.Stats.Store), lblServerID) + metrics.newGaugeMetric(sc.descs.JetstreamFilestoreReservedBytes, float64(sm.Stats.JetStream.Stats.ReservedStore), lblServerID) + metrics.newGaugeMetric(sc.descs.JetstreamMemstoreUsedBytes, float64(sm.Stats.JetStream.Stats.Memory), lblServerID) + metrics.newGaugeMetric(sc.descs.JetstreamMemstoreReservedBytes, float64(sm.Stats.JetStream.Stats.ReservedMemory), lblServerID) + metrics.newGaugeMetric(sc.descs.JetstreamAccounts, float64(sm.Stats.JetStream.Stats.Accounts), lblServerID) + metrics.newGaugeMetric(sc.descs.JetstreamHAAssets, float64(sm.Stats.JetStream.Stats.HAAssets), lblServerID) + // NIT: Technically these should be Counters, not Gauges. + // At present, Total does not include Errors. Keeping them separate + metrics.newGaugeMetric(sc.descs.JetstreamAPIRequests, float64(sm.Stats.JetStream.Stats.API.Total), lblServerID) + metrics.newGaugeMetric(sc.descs.JetstreamAPIErrors, float64(sm.Stats.JetStream.Stats.API.Errors), lblServerID) + } - jsRaftGroupLabelValues := []string{sm.Server.ID, serverName(sm), sm.Server.Cluster} - // FIXME: add labels needed or remove... + if sm.Stats.JetStream.Meta == nil { + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupInfo, float64(0), []string{"", "", sm.Server.ID, serverName(sm), "", ""}) + } else { + jsRaftGroupInfoLabelValues := []string{jsDomainLabelValue(sm), "_meta_", sm.Server.ID, serverName(sm), sm.Stats.JetStream.Meta.Name, sm.Stats.JetStream.Meta.Leader} + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupInfo, float64(1), jsRaftGroupInfoLabelValues) - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupSize, float64(sm.Stats.JetStream.Meta.Size), jsRaftGroupLabelValues) + jsRaftGroupLabelValues := []string{sm.Server.ID, serverName(sm), sm.Server.Cluster} + // FIXME: add labels needed or remove... - // Could provide false positive if two server have the same server_name in the same or different clusters in the super-cluster... - // At present, in this statsz only a peer that thinks it's a Leader will have `sm.Stats.JetStream.Meta.Replicas != nil`. - if sm.Stats.JetStream.Meta.Leader != "" && sm.Server.Name != "" && sm.Server.Name == sm.Stats.JetStream.Meta.Leader { - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupLeader, float64(1), jsRaftGroupLabelValues) - } else { - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupLeader, float64(0), jsRaftGroupLabelValues) - } - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicas, float64(len(sm.Stats.JetStream.Meta.Replicas)), jsRaftGroupLabelValues) - for _, jsr := range sm.Stats.JetStream.Meta.Replicas { - if jsr == nil { - continue - } - jsClusterReplicaLabelValues := []string{sm.Server.ID, serverName(sm), jsr.Name, sm.Server.Cluster} - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaActive, float64(jsr.Active), jsClusterReplicaLabelValues) - if jsr.Current { - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaCurrent, float64(1), jsClusterReplicaLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupSize, float64(sm.Stats.JetStream.Meta.Size), jsRaftGroupLabelValues) + + // Could provide false positive if two server have the same server_name in the same or different clusters in the super-cluster... + // At present, in this statsz only a peer that thinks it's a Leader will have `sm.Stats.JetStream.Meta.Replicas != nil`. + if sm.Stats.JetStream.Meta.Leader != "" && sm.Server.Name != "" && sm.Server.Name == sm.Stats.JetStream.Meta.Leader { + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupLeader, float64(1), jsRaftGroupLabelValues) } else { - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaCurrent, float64(0), jsClusterReplicaLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupLeader, float64(0), jsRaftGroupLabelValues) } - if jsr.Offline { - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaOffline, float64(1), jsClusterReplicaLabelValues) - } else { - ch <- newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaOffline, float64(0), jsClusterReplicaLabelValues) + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicas, float64(len(sm.Stats.JetStream.Meta.Replicas)), jsRaftGroupLabelValues) + for _, jsr := range sm.Stats.JetStream.Meta.Replicas { + if jsr == nil { + continue + } + jsClusterReplicaLabelValues := []string{sm.Server.ID, serverName(sm), jsr.Name, sm.Server.Cluster} + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaActive, float64(jsr.Active), jsClusterReplicaLabelValues) + if jsr.Current { + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaCurrent, float64(1), jsClusterReplicaLabelValues) + } else { + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaCurrent, float64(0), jsClusterReplicaLabelValues) + } + if jsr.Offline { + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaOffline, float64(1), jsClusterReplicaLabelValues) + } else { + metrics.newGaugeMetric(sc.descs.JetstreamClusterRaftGroupReplicaOffline, float64(0), jsClusterReplicaLabelValues) + } } } } - } - for _, rs := range sm.Stats.Routes { - labels = sc.routeLabelValues(sm, rs) - ch <- newGaugeMetric(sc.descs.RouteSentMsgs, float64(rs.Sent.Msgs), labels) - ch <- newGaugeMetric(sc.descs.RouteSentBytes, float64(rs.Sent.Bytes), labels) - ch <- newGaugeMetric(sc.descs.RouteRecvMsgs, float64(rs.Received.Msgs), labels) - ch <- newGaugeMetric(sc.descs.RouteRecvBytes, float64(rs.Received.Bytes), labels) - ch <- newGaugeMetric(sc.descs.RoutePending, float64(rs.Pending), labels) - } + for _, rs := range sm.Stats.Routes { + labels = sc.routeLabelValues(sm, rs) + metrics.newGaugeMetric(sc.descs.RouteSentMsgs, float64(rs.Sent.Msgs), labels) + metrics.newGaugeMetric(sc.descs.RouteSentBytes, float64(rs.Sent.Bytes), labels) + metrics.newGaugeMetric(sc.descs.RouteRecvMsgs, float64(rs.Received.Msgs), labels) + metrics.newGaugeMetric(sc.descs.RouteRecvBytes, float64(rs.Received.Bytes), labels) + metrics.newGaugeMetric(sc.descs.RoutePending, float64(rs.Pending), labels) + } - for _, gw := range sm.Stats.Gateways { - labels = sc.gatewayLabelValues(sm, gw) - ch <- newGaugeMetric(sc.descs.GatewaySentMsgs, float64(gw.Sent.Msgs), labels) - ch <- newGaugeMetric(sc.descs.GatewaySentBytes, float64(gw.Sent.Bytes), labels) - ch <- newGaugeMetric(sc.descs.GatewayRecvMsgs, float64(gw.Received.Msgs), labels) - ch <- newGaugeMetric(sc.descs.GatewayRecvBytes, float64(gw.Received.Bytes), labels) - ch <- newGaugeMetric(sc.descs.GatewayNumInbound, float64(gw.NumInbound), labels) + for _, gw := range sm.Stats.Gateways { + labels = sc.gatewayLabelValues(sm, gw) + metrics.newGaugeMetric(sc.descs.GatewaySentMsgs, float64(gw.Sent.Msgs), labels) + metrics.newGaugeMetric(sc.descs.GatewaySentBytes, float64(gw.Sent.Bytes), labels) + metrics.newGaugeMetric(sc.descs.GatewayRecvMsgs, float64(gw.Received.Msgs), labels) + metrics.newGaugeMetric(sc.descs.GatewayRecvBytes, float64(gw.Received.Bytes), labels) + metrics.newGaugeMetric(sc.descs.GatewayNumInbound, float64(gw.NumInbound), labels) + } } - } - // Account scope metrics - if sc.collectAccounts { - ch <- newGaugeMetric(sc.descs.accCount, float64(len(sc.accStats)), nil) - for _, stat := range sc.accStats { - id := []string{stat.accountID} - - ch <- newGaugeMetric(sc.descs.accConnCount, stat.connCount, id) - ch <- newGaugeMetric(sc.descs.accLeafCount, stat.leafCount, id) - ch <- newGaugeMetric(sc.descs.accSubCount, stat.subCount, id) - - ch <- newCounterMetric(sc.descs.accBytesSent, stat.bytesSent, id) - ch <- newCounterMetric(sc.descs.accBytesRecv, stat.bytesRecv, id) - ch <- newCounterMetric(sc.descs.accMsgsSent, stat.msgsSent, id) - ch <- newCounterMetric(sc.descs.accMsgsRecv, stat.msgsRecv, id) - - ch <- newGaugeMetric(sc.descs.accJetstreamEnabled, stat.jetstreamEnabled, id) - ch <- newGaugeMetric(sc.descs.accJetstreamMemoryUsed, stat.jetstreamMemoryUsed, id) - ch <- newGaugeMetric(sc.descs.accJetstreamStorageUsed, stat.jetstreamStorageUsed, id) - ch <- newGaugeMetric(sc.descs.accJetstreamMemoryReserved, stat.jetstreamMemoryReserved, id) - ch <- newGaugeMetric(sc.descs.accJetstreamStorageReserved, stat.jetstreamStorageReserved, id) - for tier, size := range stat.jetstreamTieredMemoryUsed { - ch <- newGaugeMetric(sc.descs.accJetstreamTieredMemoryUsed, size, append(id, fmt.Sprintf("R%d", tier))) - } - for tier, size := range stat.jetstreamTieredStorageUsed { - ch <- newGaugeMetric(sc.descs.accJetstreamTieredStorageUsed, size, append(id, fmt.Sprintf("R%d", tier))) - } - for tier, size := range stat.jetstreamTieredMemoryReserved { - ch <- newGaugeMetric(sc.descs.accJetstreamTieredMemoryReserved, size, append(id, fmt.Sprintf("R%d", tier))) - } - for tier, size := range stat.jetstreamTieredStorageReserved { - ch <- newGaugeMetric(sc.descs.accJetstreamTieredStorageReserved, size, append(id, fmt.Sprintf("R%d", tier))) + // Account scope metrics + if sc.collectAccounts { + metrics.newGaugeMetric(sc.descs.accCount, float64(len(sc.accStats)), nil) + for _, stat := range sc.accStats { + id := []string{stat.accountID} + + metrics.newGaugeMetric(sc.descs.accConnCount, stat.connCount, id) + metrics.newGaugeMetric(sc.descs.accLeafCount, stat.leafCount, id) + metrics.newGaugeMetric(sc.descs.accSubCount, stat.subCount, id) + + metrics.newCounterMetric(sc.descs.accBytesSent, stat.bytesSent, id) + metrics.newCounterMetric(sc.descs.accBytesRecv, stat.bytesRecv, id) + metrics.newCounterMetric(sc.descs.accMsgsSent, stat.msgsSent, id) + metrics.newCounterMetric(sc.descs.accMsgsRecv, stat.msgsRecv, id) + + metrics.newGaugeMetric(sc.descs.accJetstreamEnabled, stat.jetstreamEnabled, id) + metrics.newGaugeMetric(sc.descs.accJetstreamMemoryUsed, stat.jetstreamMemoryUsed, id) + metrics.newGaugeMetric(sc.descs.accJetstreamStorageUsed, stat.jetstreamStorageUsed, id) + metrics.newGaugeMetric(sc.descs.accJetstreamMemoryReserved, stat.jetstreamMemoryReserved, id) + metrics.newGaugeMetric(sc.descs.accJetstreamStorageReserved, stat.jetstreamStorageReserved, id) + for tier, size := range stat.jetstreamTieredMemoryUsed { + metrics.newGaugeMetric(sc.descs.accJetstreamTieredMemoryUsed, size, append(id, fmt.Sprintf("R%d", tier))) + } + for tier, size := range stat.jetstreamTieredStorageUsed { + metrics.newGaugeMetric(sc.descs.accJetstreamTieredStorageUsed, size, append(id, fmt.Sprintf("R%d", tier))) + } + for tier, size := range stat.jetstreamTieredMemoryReserved { + metrics.newGaugeMetric(sc.descs.accJetstreamTieredMemoryReserved, size, append(id, fmt.Sprintf("R%d", tier))) + } + for tier, size := range stat.jetstreamTieredStorageReserved { + metrics.newGaugeMetric(sc.descs.accJetstreamTieredStorageReserved, size, append(id, fmt.Sprintf("R%d", tier))) + } + + metrics.newGaugeMetric(sc.descs.accJetstreamStreamCount, stat.jetstreamStreamCount, id) + for _, streamStat := range stat.jetstreamStreams { + metrics.newGaugeMetric(sc.descs.accJetstreamConsumerCount, streamStat.consumerCount, append(id, streamStat.streamName)) + metrics.newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(id, streamStat.streamName)) + } } + } - ch <- newGaugeMetric(sc.descs.accJetstreamStreamCount, stat.jetstreamStreamCount, id) - for _, streamStat := range stat.jetstreamStreams { - ch <- newGaugeMetric(sc.descs.accJetstreamConsumerCount, streamStat.consumerCount, append(id, streamStat.streamName)) - ch <- newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(id, streamStat.streamName)) + collectCh := make(chan prometheus.Metric) + + // We want to collect these before we exit the flight group + // but they should still be sent to every caller + var wg sync.WaitGroup + wg.Add(1) + go func() { + for m := range collectCh { + metrics.appendMetric(m) } + + wg.Done() + }() + + timer.ObserveDuration() + sc.pollTime.Collect(collectCh) + sc.pollErrCnt.Collect(collectCh) + sc.surveyedCnt.Collect(collectCh) + sc.expectedCnt.Collect(collectCh) + sc.lateReplies.Collect(collectCh) + sc.noReplies.Collect(collectCh) + + close(collectCh) + + wg.Wait() + + return metrics.metrics, nil + }) + if err != nil { + sc.logger.Error(err) + return + } + + if m, ok := result.([]prometheus.Metric); !ok || m == nil { + if m == nil { + sc.logger.Error("no metrics collected") + } else { + sc.logger.Error("unexpected collect response type") } + return + } + + for _, m := range result.([]prometheus.Metric) { + ch <- m } + } func requestMany(nc *nats.Conn, sc *StatzCollector, subject string, data []byte) ([]*nats.Msg, error) { diff --git a/surveyor/surveyor.go b/surveyor/surveyor.go index 17d8ee8..b536c27 100644 --- a/surveyor/surveyor.go +++ b/surveyor/surveyor.go @@ -281,32 +281,12 @@ func (s *Surveyor) httpAuthMiddleware(next http.Handler) http.Handler { }) } -func (s *Surveyor) httpConcurrentPollBlockMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - sz := s.statzC - - if sz == nil { - next.ServeHTTP(rw, r) - return - } - - if sz.Polling() { - rw.WriteHeader(http.StatusServiceUnavailable) - rw.Write([]byte("Concurrent polls are not supported")) - s.logger.Warnln("Concurrent access detected and blocked") - return - } - - next.ServeHTTP(rw, r) - }) -} - // getScrapeHandler returns a chain of handlers that handles auth, concurency checks // and the prometheus handlers func (s *Surveyor) getScrapeHandler() http.Handler { - return s.httpAuthMiddleware(s.httpConcurrentPollBlockMiddleware(promhttp.InstrumentMetricHandler( + return s.httpAuthMiddleware(promhttp.InstrumentMetricHandler( s.promRegistry, promhttp.HandlerFor(s.promRegistry, promhttp.HandlerOpts{}), - ))) + )) } // startHTTP configures and starts the HTTP server for applications to poll data from diff --git a/surveyor/surveyor_test.go b/surveyor/surveyor_test.go index 1bd4414..34d1eac 100644 --- a/surveyor/surveyor_test.go +++ b/surveyor/surveyor_test.go @@ -21,10 +21,12 @@ import ( "net/http" "os" "strings" + "sync" "testing" "time" "github.com/nats-io/nats.go" + "github.com/prometheus/common/expfmt" st "github.com/nats-io/nats-surveyor/test" ) @@ -45,7 +47,7 @@ func httpGetSecure(url string) (*http.Response, error) { return nil, err } transport := &http.Transport{TLSClientConfig: tlsConfig} - httpClient := &http.Client{Transport: transport, Timeout: 3 * time.Second} + httpClient := &http.Client{Transport: transport, Timeout: 5 * time.Second} return httpClient.Get(url) } @@ -604,11 +606,13 @@ func TestSurveyor_MissingResponses(t *testing.T) { }) } -func TestSurveyor_ConcurrentBlock(t *testing.T) { +func TestSurveyor_Concurrent(t *testing.T) { sc := st.NewSuperCluster(t) defer sc.Shutdown() - s, err := NewSurveyor(getTestOptions()) + testOptions := getTestOptions() + testOptions.ExpectedServers = 3 + s, err := NewSurveyor(testOptions) if err != nil { t.Fatalf("couldn't create surveyor: %v", err) } @@ -616,15 +620,49 @@ func TestSurveyor_ConcurrentBlock(t *testing.T) { t.Fatalf("start error: %v", err) } defer s.Stop() + metricFamily := "nats_core_mem_bytes" + results := make([]float64, 0) + mutex := sync.Mutex{} + var wg sync.WaitGroup - s.statzC.polling = true - _, err = PollSurveyorEndpoint(t, defaultSurveyorURL, false, http.StatusOK) - if err == nil { - t.Fatalf("Expected an error but none were encountered") - } - - if err.Error() != "expected a 200 response, got 503" { - t.Fatalf("Expected 503 error but got: %v", err) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + output, err := PollSurveyorEndpoint(t, defaultSurveyorURL, false, http.StatusOK) + if err != nil { + t.Errorf("%v", err) + return + } + + metricsParser := expfmt.TextParser{} + metricFamilies, err := metricsParser.TextToMetricFamilies(strings.NewReader(output)) + if err != nil { + t.Errorf("Error parsing metrics: %s", err) + return + } + metricFamily, found := metricFamilies[metricFamily] + if !found || len(metricFamily.Metric) == 0 { + t.Errorf("Missing expected metric") + return + } + + value := metricFamily.Metric[0].GetGauge().GetValue() + + mutex.Lock() + defer mutex.Unlock() + results = append(results, value) + }() + } + + wg.Wait() + + baseVal := results[0] + + for _, v := range results { + if v != baseVal { + t.Fatalf("Expected all values to be the same") + } } }