diff --git a/client/client_4.x.go b/client/client_4.x.go index 63a7fb5..6c2f2c0 100644 --- a/client/client_4.x.go +++ b/client/client_4.x.go @@ -3,19 +3,22 @@ package client import ( "emqx-exporter/collector" "fmt" - jsoniter "github.com/json-iterator/go" - "github.com/valyala/fasthttp" "net/http" "strconv" "strings" "time" + + jsoniter "github.com/json-iterator/go" + "github.com/valyala/fasthttp" ) var _ client = &cluster4x{} type cluster4x struct { - version string - client *fasthttp.Client + username string + password string + version string + client *fasthttp.Client } func (n *cluster4x) getVersion() string { @@ -30,8 +33,7 @@ func (n *cluster4x) getLicense() (lic *collector.LicenseInfo, err error) { } Code int }{} - - data, statusCode, err := callHTTPGet(n.client, "/api/v4/license") + data, statusCode, err := callHTTPGet(n.client, "/api/v4/license", n.username, n.password) if statusCode == http.StatusNotFound { // open source version doesn't support license api err = nil @@ -80,7 +82,7 @@ func (n *cluster4x) getClusterStatus() (cluster collector.ClusterStatus, err err } Code int }{} - err = callHTTPGetWithResp(n.client, "/api/v4/nodes", &resp) + err = callHTTPGetWithResp(n.client, "/api/v4/nodes", n.username, n.password, &resp) if err != nil { return } @@ -121,7 +123,7 @@ func (n *cluster4x) getBrokerMetrics() (metrics *collector.Broker, err error) { } Code int }{} - data, statusCode, err := callHTTPGet(n.client, "/api/v4/monitor/current_metrics") + data, statusCode, err := callHTTPGet(n.client, "/api/v4/monitor/current_metrics", n.username, n.password) if statusCode == http.StatusNotFound { // open source version doesn't support this api err = nil @@ -153,22 +155,22 @@ func (n *cluster4x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err resp := struct { Data []struct { Metrics []struct { - Node string `json:"node"` + Node string `json:"node"` SpeedMax float64 `json:"speed_max"` SpeedLast5m float64 `json:"speed_last5m"` Speed float64 `json:"speed"` - Matched int64 `json:"matched"` - Passed int64 `json:"passed"` - NoResult int64 `json:"no_result"` - Exception int64 `json:"exception"` - Failed int64 `json:"failed"` + Matched int64 `json:"matched"` + Passed int64 `json:"passed"` + NoResult int64 `json:"no_result"` + Exception int64 `json:"exception"` + Failed int64 `json:"failed"` } Actions []struct { Metrics []struct { Node string `json:"node"` - Taken int64 `json:"taken"` - Success int64 `json:"success"` - Failed int64 `json:"failed"` + Taken int64 `json:"taken"` + Success int64 `json:"success"` + Failed int64 `json:"failed"` } } ID string `json:"id"` @@ -176,7 +178,7 @@ func (n *cluster4x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err } Code int }{} - err = callHTTPGetWithResp(n.client, "/api/v4/rules?_limit=10000", &resp) + err = callHTTPGetWithResp(n.client, "/api/v4/rules?_limit=10000", n.username, n.password, &resp) if err != nil { return } @@ -226,7 +228,7 @@ func (n *cluster4x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err } func (n *cluster4x) getDataBridge() (bridges []collector.DataBridge, err error) { - bridgesResp := struct { + resp := struct { Data []struct { ID string `json:"id"` Type string @@ -234,13 +236,13 @@ func (n *cluster4x) getDataBridge() (bridges []collector.DataBridge, err error) } Code int }{} - err = callHTTPGetWithResp(n.client, "/api/v4/resources", &bridgesResp) + err = callHTTPGetWithResp(n.client, "/api/v4/resources", n.username, n.password, &resp) if err != nil { return } - bridges = make([]collector.DataBridge, len(bridgesResp.Data)) - for i, data := range bridgesResp.Data { + bridges = make([]collector.DataBridge, len(resp.Data)) + for i, data := range resp.Data { enabled := unhealthy if data.Status { enabled = healthy diff --git a/client/client_5.x.go b/client/client_5.x.go index 5a216b4..0eab335 100644 --- a/client/client_5.x.go +++ b/client/client_5.x.go @@ -3,17 +3,20 @@ package client import ( "emqx-exporter/collector" "fmt" - "github.com/valyala/fasthttp" "strconv" "time" + + "github.com/valyala/fasthttp" ) var _ client = &cluster5x{} type cluster5x struct { - version string - edition edition - client *fasthttp.Client + username string + password string + version string + edition edition + client *fasthttp.Client } func (n *cluster5x) getVersion() string { @@ -29,7 +32,7 @@ func (n *cluster5x) getLicense() (lic *collector.LicenseInfo, err error) { MaxConnections int64 `json:"max_connections"` ExpiryAt string `json:"expiry_at"` }{} - err = callHTTPGetWithResp(n.client, "/api/v5/license", &resp) + err = callHTTPGetWithResp(n.client, "/api/v5/license", n.username, n.password, &resp) if err != nil { return } @@ -60,7 +63,7 @@ func (n *cluster5x) getClusterStatus() (cluster collector.ClusterStatus, err err Load5 any `json:"load5"` Load15 any `json:"load15"` }{{}} - err = callHTTPGetWithResp(n.client, "/api/v5/nodes", &resp) + err = callHTTPGetWithResp(n.client, "/api/v5/nodes", n.username, n.password, &resp) if err != nil { return } @@ -106,7 +109,7 @@ func (n *cluster5x) getBrokerMetrics() (metrics *collector.Broker, err error) { SentMsgRate int64 `json:"sent_msg_rate"` ReceivedMsgRate int64 `json:"received_msg_rate"` }{} - err = callHTTPGetWithResp(n.client, "/api/v5/monitor_current", &resp) + err = callHTTPGetWithResp(n.client, "/api/v5/monitor_current", n.username, n.password, &resp) if err != nil { return } @@ -126,7 +129,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err Enable bool } }{} - err = callHTTPGetWithResp(n.client, "/api/v5/rules?limit=10000", &resp) + err = callHTTPGetWithResp(n.client, "/api/v5/rules?limit=10000", n.username, n.password, &resp) if err != nil { return } @@ -140,12 +143,12 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err NodeMetrics []struct { Node string Metrics struct { - Rate float64 `json:"matched.rate"` - RateLast5m float64 `json:"matched.rate.last5m"` - RateMax float64 `json:"matched.rate.max"` - Matched int64 - Passed int64 - Failed int64 + Rate float64 `json:"matched.rate"` + RateLast5m float64 `json:"matched.rate.last5m"` + RateMax float64 `json:"matched.rate.max"` + Matched int64 + Passed int64 + Failed int64 Exception int64 `json:"failed.exception"` NoResult int64 `json:"failed.no_result"` ActionTotal int64 `json:"actions.total"` @@ -154,7 +157,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err } } `json:"node_metrics"` }{} - err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), &metricsResp) + err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), n.username, n.password, &metricsResp) if err != nil { return } @@ -187,7 +190,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error) Type string Status string }{{}} - err = callHTTPGetWithResp(n.client, "/api/v5/bridges", &bridgesResp) + err = callHTTPGetWithResp(n.client, "/api/v5/bridges", n.username, n.password, &bridgesResp) if err != nil { return } @@ -211,7 +214,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error) Dropped int64 } }{} - err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), &metricsResp) + err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), n.username, n.password, &metricsResp) if err != nil { return } @@ -230,7 +233,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour Backend string Enable bool }{{}} - err = callHTTPGetWithResp(n.client, "/api/v5/authentication", &resp) + err = callHTTPGetWithResp(n.client, "/api/v5/authentication", n.username, n.password, &resp) if err != nil { return } @@ -254,7 +257,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour } `json:"node_metrics"` Status string }{} - err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), &status) + err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), n.username, n.password, &status) if err != nil { return } @@ -293,7 +296,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc Enable bool } }{} - err = callHTTPGetWithResp(n.client, "/api/v5/authorization/sources", &resp) + err = callHTTPGetWithResp(n.client, "/api/v5/authorization/sources", n.username, n.password, &resp) if err != nil { return } @@ -317,7 +320,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc } `json:"node_metrics"` Status string }{} - err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), &status) + err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), n.username, n.password, &status) if err != nil { return } diff --git a/client/cluster.go b/client/cluster.go index a974f22..118ddd1 100644 --- a/client/cluster.go +++ b/client/cluster.go @@ -3,86 +3,60 @@ package client import ( "context" "emqx-exporter/collector" + "emqx-exporter/config" "fmt" - "github.com/alecthomas/kingpin/v2" - "github.com/go-kit/log" - "github.com/go-kit/log/level" "strconv" - "strings" "sync" "time" -) -var ( - emqxNodes = kingpin.Flag("emqx.nodes", "The list of EMQX cluster node addr").Default("").String() - emqxUsername = kingpin.Flag("emqx.auth-username", "The username used for emqx api basic auth").Default("").String() - emqxPassword = kingpin.Flag("emqx.auth-password", "The password used for emqx api basic auth").Default("").String() + "github.com/go-kit/log" + "github.com/go-kit/log/level" ) type cluster struct { client client nodeLock sync.RWMutex - logger log.Logger } -func NewCluster(logger log.Logger) collector.Cluster { - addrs := strings.Split(*emqxNodes, ",") - if len(addrs) == 0 { - panic(fmt.Sprintf("Invalid emqx node addrs: %s", *emqxNodes)) - } - for _, addr := range addrs { - if !strings.ContainsRune(addr, ':') { - panic(fmt.Sprintf("Invalid emqx node addr: %s", addr)) +func NewCluster(metrics *config.Metrics, logger log.Logger) collector.Cluster { + c := &cluster{} + + go func() { + httpClient := getHTTPClient(metrics.Target) + for { + client4 := &cluster4x{ + username: metrics.APIKey, + password: metrics.APISecret, + client: httpClient, + } + if _, err := client4.getClusterStatus(); err != nil { + c.client = client4 + return + } + + client5 := &cluster5x{ + username: metrics.APIKey, + password: metrics.APISecret, + client: httpClient, + } + if _, err := client5.getClusterStatus(); err == nil { + c.client = client5 + return + } + + level.Error(logger).Log("msg", "Couldn't create cluster client, will retry it after 5 seconds", "err", "no cluster node found") + c.client = nil + + select { + case <-context.Background().Done(): + return + case <-time.After(5 * time.Second): + } } - } - - if *emqxUsername == "" { - panic("Missing username used for emqx api basic auth") - } - if *emqxPassword == "" { - panic("Missing password used for emqx api basic auth") - } - - c := &cluster{ - logger: logger, - } - go c.checkNodes() + }() return c } -func (c *cluster) checkNodes() { - httpClient := getHTTPClient(*emqxNodes) - var currentVersion string - for { - var client client - var err4, err5 error - client = &cluster4x{client: httpClient} - _, err4 = client.getClusterStatus() - if err4 != nil { - client = &cluster5x{client: httpClient} - _, err5 = client.getClusterStatus() - } - if err4 != nil && err5 != nil { - _ = level.Warn(c.logger).Log("check nodes", "couldn't get node info", "addr", *emqxNodes, - "err4", err4.Error(), "err5", err5.Error()) - client = nil - } else if currentVersion != client.getVersion() { - currentVersion = client.getVersion() - _ = level.Info(c.logger).Log("ClusterVersion", currentVersion) - } - - c.nodeLock.Lock() - c.client = client - c.nodeLock.Unlock() - - select { - case <-context.Background().Done(): - return - case <-time.After(5 * time.Second): - } - } -} - func (c *cluster) GetLicense() (lic *collector.LicenseInfo, err error) { client := c.getNode() if client == nil { @@ -102,7 +76,6 @@ func (c *cluster) GetLicense() (lic *collector.LicenseInfo, err error) { func (c *cluster) GetClusterStatus() (cluster collector.ClusterStatus, err error) { client := c.getNode() if client == nil { - cluster.Status = unknown return } cluster, err = client.getClusterStatus() diff --git a/client/handler.go b/client/handler.go new file mode 100644 index 0000000..29aafd3 --- /dev/null +++ b/client/handler.go @@ -0,0 +1,77 @@ +package client + +import ( + "emqx-exporter/collector" + "emqx-exporter/config" + + stdlog "log" + "net/http" + "sort" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + promcollectors "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/common/version" +) + +func NewHandler(disableExporterMetrics bool, maxRequests int, metrics *config.Metrics, logger log.Logger) http.Handler { + registry := prometheus.NewRegistry() + registry.MustRegister(version.NewCollector("emqx_exporter")) + + if metrics == nil { + level.Info(logger).Log("msg", "No metrics configured, skipping cluster metrics") + } else { + emqxCluster := NewCluster(metrics, logger) + nc, err := collector.NewEMQXCollector(emqxCluster, logger) + if err != nil { + level.Debug(logger).Log("msg", "Couldn't create collector", "err", err) + panic("Couldn't create collector") + } + + level.Info(logger).Log("msg", "Enabled collectors") + collectors := make([]string, 0, len(nc.Collectors)) + for n := range nc.Collectors { + collectors = append(collectors, n) + } + sort.Strings(collectors) + for _, c := range collectors { + level.Info(logger).Log("collector", c) + } + registry.MustRegister(nc) + } + + var h http.Handler + if disableExporterMetrics { + level.Info(logger).Log("msg", "Excluding metrics about the exporter itself") + h = promhttp.HandlerFor( + registry, + promhttp.HandlerOpts{ + ErrorLog: stdlog.New(log.NewStdlibAdapter(level.Error(logger)), "", 0), + ErrorHandling: promhttp.ContinueOnError, + MaxRequestsInFlight: maxRequests, + }) + } else { + level.Info(logger).Log("msg", "Including metrics about the exporter itself") + exporterMetricsRegistry := prometheus.NewRegistry() + exporterMetricsRegistry.MustRegister( + promcollectors.NewProcessCollector(promcollectors.ProcessCollectorOpts{}), + promcollectors.NewGoCollector(), + ) + + h = promhttp.HandlerFor( + prometheus.Gatherers{exporterMetricsRegistry, registry}, + promhttp.HandlerOpts{ + ErrorLog: stdlog.New(log.NewStdlibAdapter(level.Error(logger)), "", 0), + ErrorHandling: promhttp.ContinueOnError, + MaxRequestsInFlight: maxRequests, + Registry: exporterMetricsRegistry, + }) + h = promhttp.InstrumentMetricHandler( + exporterMetricsRegistry, h, + ) + } + + return h +} diff --git a/client/utils.go b/client/utils.go index 10a4b93..3b30788 100644 --- a/client/utils.go +++ b/client/utils.go @@ -4,20 +4,17 @@ import ( "encoding/base64" "errors" "fmt" - "github.com/alecthomas/kingpin/v2" - jsoniter "github.com/json-iterator/go" - "github.com/valyala/fasthttp" "net/http" "net/netip" "strings" "time" -) -var ( - readTimeout = kingpin.Flag("emqx.readTimeout", "Maximum seconds for full response reading (including body)").Default("5").Int() - connWaitTimeout = kingpin.Flag("emqx.connWaitTimeout", "Maximum seconds for waiting for a free connection").Default("5").Int() + jsoniter "github.com/json-iterator/go" + "github.com/valyala/fasthttp" ) +var () + func cutNodeName(nodeName string) string { slice := strings.Split(nodeName, "@") if len(slice) != 2 { @@ -37,9 +34,9 @@ func getHTTPClient(host string) *fasthttp.Client { Name: "EMQX-Exporter", //User-Agent MaxConnsPerHost: 5, MaxIdleConnDuration: 30 * time.Second, - ReadTimeout: time.Duration(*readTimeout) * time.Second, + ReadTimeout: 5 * time.Second, WriteTimeout: 5 * time.Second, - MaxConnWaitTimeout: time.Duration(*connWaitTimeout) * time.Second, + MaxConnWaitTimeout: 5 * time.Second, ConfigureClient: func(hc *fasthttp.HostClient) error { hc.Addr = host return nil @@ -47,14 +44,17 @@ func getHTTPClient(host string) *fasthttp.Client { } } -func callHTTPGet(client *fasthttp.Client, uri string) (data []byte, statusCode int, err error) { +func callHTTPGet(client *fasthttp.Client, uri, username, password string) (data []byte, statusCode int, err error) { req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) req.SetRequestURI(uri) - req.SetHost("localhost") req.Header.SetMethod(http.MethodGet) - req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(*emqxUsername+":"+*emqxPassword))) + req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password))) + // for fasthttp, must set host, otherwise will panic + // but it doesn't matter what value is set + // the host will be replaced by the real host in fasthttp.Client.ConfigureClient + req.Header.SetHost("foo.bar") resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) @@ -106,13 +106,12 @@ func callHTTPGet(client *fasthttp.Client, uri string) (data []byte, statusCode i return } -func callHTTPGetWithResp(client *fasthttp.Client, uri string, respData interface{}) (err error) { - data, _, err := callHTTPGet(client, uri) +func callHTTPGetWithResp(client *fasthttp.Client, uri, username, password string, respData interface{}) (err error) { + data, _, err := callHTTPGet(client, uri, username, password) if err != nil { return } - //fmt.Println("data:", string(data)) err = jsoniter.Unmarshal(data, respData) if err != nil { err = fmt.Errorf("unmarshal api resp failed: %s, %s", uri, err.Error()) diff --git a/config/config.go b/config/config.go index 2b3d559..1873edb 100644 --- a/config/config.go +++ b/config/config.go @@ -12,7 +12,14 @@ import ( ) type Config struct { - Probes []Probe `yaml:"probes"` + Metrics *Metrics `yaml:"metrics,omitempty"` + Probes []Probe `yaml:"probes,omitempty"` +} + +type Metrics struct { + Target string `yaml:"target"` + APIKey string `yaml:"api_key"` + APISecret string `yaml:"api_secret"` } type Probe struct { @@ -70,7 +77,23 @@ func (sc *SafeConfig) ReloadConfig(confFile string) (err error) { return fmt.Errorf("error parsing config file: %s", err) } + if c.Metrics != nil { + if c.Metrics.Target == "" { + return fmt.Errorf("metrics.target is required") + } + if c.Metrics.APIKey == "" { + return fmt.Errorf("metrics.api_key is required") + } + + if c.Metrics.APISecret == "" { + return fmt.Errorf("metrics.api_secret is required") + } + } + for index, probe := range c.Probes { + if probe.Target == "" { + return fmt.Errorf("probes[%d].target is required", index) + } if probe.Scheme == "" { probe.Scheme = "tcp" } @@ -78,7 +101,7 @@ func (sc *SafeConfig) ReloadConfig(confFile string) (err error) { probe.ClientID = "emqx_exporter_probe" } if probe.Topic == "" { - probe.Topic = "emqx_exporter_probe" + probe.Topic = "emqx-exporter-probe" } c.Probes[index] = probe } diff --git a/config/example/config.yaml b/config/example/config.yaml index d63cf32..c338c1d 100644 --- a/config/example/config.yaml +++ b/config/example/config.yaml @@ -1,8 +1,15 @@ +metrics: + ## EMQX API + target: 127.0.0.1:18083 + ## EMQX API key + api_key: "76668f8a2003d597" + ## EMQX API secret + api_secret: "CRCDB6lxxzN58e5HoD82llBC0Erg1TVZIAUsdTjPU7N" probes: - - target: broker.emqx.io:1883 - scheme: + - target: broker.emqx.io:1883 ## MQTT broker address + scheme: ## tcp, default is tcp client_id: username: password: topic: - qos: + qos: \ No newline at end of file diff --git a/examples/docker-compose/docker-compose.yml b/examples/docker-compose/docker-compose.yml index d59749e..07f76d3 100644 --- a/examples/docker-compose/docker-compose.yml +++ b/examples/docker-compose/docker-compose.yml @@ -25,13 +25,8 @@ services: - emqx image: emqx-exporter container_name: exporter-demo - ports: - - 8085:8085 - command: - # the username and password is defined in the file api_secret - - '--emqx.nodes=emqx-demo:18083' - - '--emqx.auth-username=76668f8a2003d597' - - '--emqx.auth-password=CRCDB6lxxzN58e5HoD82llBC0Erg1TVZIAUsdTjPU7N' + # ports: + # - 8085:8085 volumes: - ./emqx-exporter.config.yaml:/etc/emqx-exporter/config.yaml diff --git a/examples/docker-compose/emqx-exporter.config.yaml b/examples/docker-compose/emqx-exporter.config.yaml index ca2d3cb..eb65921 100644 --- a/examples/docker-compose/emqx-exporter.config.yaml +++ b/examples/docker-compose/emqx-exporter.config.yaml @@ -1,2 +1,6 @@ +metrics: + target: emqx-demo:18083 + api_key: 76668f8a2003d597 + api_secret: CRCDB6lxxzN58e5HoD82llBC0Erg1TVZIAUsdTjPU7N probes: - target: emqx-demo:1883 diff --git a/main.go b/main.go index 3b5b9a6..29b117c 100644 --- a/main.go +++ b/main.go @@ -18,127 +18,30 @@ import ( "emqx-exporter/config" "emqx-exporter/prober" - "fmt" - stdlog "log" "net/http" _ "net/http/pprof" "os" - "os/user" "runtime" - "sort" "github.com/alecthomas/kingpin/v2" - promcollectors "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/common/promlog" "github.com/prometheus/common/promlog/flag" "gopkg.in/yaml.v2" - "emqx-exporter/collector" - - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/version" "github.com/prometheus/exporter-toolkit/web" "github.com/prometheus/exporter-toolkit/web/kingpinflag" ) -// handler wraps an unfiltered http.Handler but uses a filtered handler, -// created on the fly, if filtering is requested. Create instances with -// newHandler. -type handler struct { - // exporterMetricsRegistry is a separate registry for the metrics about - // the exporter itself. - exporterMetricsRegistry *prometheus.Registry - includeExporterMetrics bool - maxRequests int - logger log.Logger -} - -func (h *handler) innerHandler() (http.Handler, error) { - emqxCluster := client.NewCluster(h.logger) - nc, err := collector.NewEMQXCollector(emqxCluster, h.logger) - if err != nil { - return nil, fmt.Errorf("couldn't create collector: %s", err) - } - - level.Info(h.logger).Log("msg", "Enabled collectors") - collectors := make([]string, 0, len(nc.Collectors)) - for n := range nc.Collectors { - collectors = append(collectors, n) - } - sort.Strings(collectors) - for _, c := range collectors { - level.Info(h.logger).Log("collector", c) - } - - r := prometheus.NewRegistry() - r.MustRegister(version.NewCollector("emqx_exporter")) - if err := r.Register(nc); err != nil { - return nil, fmt.Errorf("couldn't register node collector: %s", err) - } - handler := promhttp.HandlerFor( - prometheus.Gatherers{h.exporterMetricsRegistry, r}, - promhttp.HandlerOpts{ - ErrorLog: stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0), - ErrorHandling: promhttp.ContinueOnError, - MaxRequestsInFlight: h.maxRequests, - Registry: h.exporterMetricsRegistry, - }, - ) - if h.includeExporterMetrics { - h.exporterMetricsRegistry.MustRegister( - promcollectors.NewProcessCollector(promcollectors.ProcessCollectorOpts{}), - promcollectors.NewGoCollector(), - ) - // Note that we have to use h.exporterMetricsRegistry here to - // use the same promhttp metrics for all expositions. - handler = promhttp.InstrumentMetricHandler( - h.exporterMetricsRegistry, handler, - ) - } - return handler, nil -} - -func newHandler(includeExporterMetrics bool, maxRequests int, logger log.Logger) http.Handler { - h := &handler{ - exporterMetricsRegistry: prometheus.NewRegistry(), - includeExporterMetrics: includeExporterMetrics, - maxRequests: maxRequests, - logger: logger, - } - if innerHandler, err := h.innerHandler(); err != nil { - panic(fmt.Sprintf("Couldn't create metrics handler: %s", err)) - } else { - return innerHandler - } -} - -var ( - sc = config.NewSafeConfig(prometheus.DefaultRegisterer) - - configFile = kingpin.Flag("config.file", "EMQX exporter configuration file.").Default("/etc/emqx-exporter/config.yaml").String() -) - -func init() { - prometheus.MustRegister(version.NewCollector("emqx_exporter")) -} - func main() { var ( - disableExporterMetrics = kingpin.Flag( - "web.disable-exporter-metrics", - "Exclude metrics about the exporter itself (promhttp_*, process_*, go_*).", - ).Bool() - maxRequests = kingpin.Flag( - "web.max-requests", - "Maximum number of parallel scrape requests. Use 0 to disable.", - ).Default("40").Int() - maxProcs = kingpin.Flag( - "runtime.gomaxprocs", "The target number of CPUs Go will run on (GOMAXPROCS)", - ).Envar("GOMAXPROCS").Default("1").Int() - toolkitFlags = kingpinflag.AddFlags(kingpin.CommandLine, ":8085") + configFile = kingpin.Flag("config.file", "EMQX exporter configuration file.").Default("/etc/emqx-exporter/config.yaml").String() + maxProcs = kingpin.Flag("runtime.gomaxprocs", "The target number of CPUs Go will run on (GOMAXPROCS)").Envar("GOMAXPROCS").Default("4").Int() + maxRequests = kingpin.Flag("web.max-requests", "Maximum number of parallel scrape requests. Use 0 to disable.").Default("40").Int() + disableExporterMetrics = kingpin.Flag("web.disable-exporter-metrics", "Exclude metrics about the exporter itself (promhttp_*, process_*, go_*).").Bool() + toolkitFlags = kingpinflag.AddFlags(kingpin.CommandLine, ":8085") ) promlogConfig := &promlog.Config{} @@ -152,19 +55,18 @@ func main() { level.Info(logger).Log("msg", "Starting emqx-exporter", "version", version.Info()) level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext()) + // GOMAXPROCS returns the previous setting. If n < 1, it does not change the current setting. + runtime.GOMAXPROCS(*maxProcs) + level.Debug(logger).Log("msg", "Go MAXPROCS", "procs", runtime.GOMAXPROCS(0)) + + sc := config.NewSafeConfig(prometheus.DefaultRegisterer) if err := sc.ReloadConfig(*configFile); err != nil { level.Error(logger).Log("msg", "Error loading config", "err", err) os.Exit(1) } level.Info(logger).Log("msg", "Loaded config file") - if user, err := user.Current(); err == nil && user.Uid == "0" { - level.Warn(logger).Log("msg", "EMQX Exporter is running as root user. This exporter is designed to run as unprivileged user, root is not required.") - } - runtime.GOMAXPROCS(*maxProcs) - level.Debug(logger).Log("msg", "Go MAXPROCS", "procs", runtime.GOMAXPROCS(0)) - - http.Handle("/metrics", newHandler(!*disableExporterMetrics, *maxRequests, logger)) + http.Handle("/metrics", client.NewHandler(*disableExporterMetrics, *maxRequests, sc.C.Metrics, logger)) http.HandleFunc("/probe", func(w http.ResponseWriter, r *http.Request) { sc.Lock() @@ -212,9 +114,9 @@ func main() { } http.Handle("/", landingPage) - server := &http.Server{} - if err := web.ListenAndServe(server, toolkitFlags, logger); err != nil { - level.Error(logger).Log("err", err) + srv := &http.Server{} + if err := web.ListenAndServe(srv, toolkitFlags, logger); err != nil { + level.Error(logger).Log("msg", "Error starting HTTP server", "err", err) os.Exit(1) } } diff --git a/prober/handler.go b/prober/handler.go index fb11dc7..0560589 100644 --- a/prober/handler.go +++ b/prober/handler.go @@ -2,6 +2,7 @@ package prober import ( "emqx-exporter/config" + "fmt" "net/http"