From 985ecdef698f46d968ed6184ec69fb39fd15b383 Mon Sep 17 00:00:00 2001 From: panfeng Date: Fri, 18 Aug 2023 17:31:42 +0800 Subject: [PATCH 1/2] sync vendor dir --- .../client_golang/prometheus/push/push.go | 320 ++++++++++++++++++ vendor/modules.txt | 1 + 2 files changed, 321 insertions(+) create mode 100644 vendor/github.com/prometheus/client_golang/prometheus/push/push.go diff --git a/vendor/github.com/prometheus/client_golang/prometheus/push/push.go b/vendor/github.com/prometheus/client_golang/prometheus/push/push.go new file mode 100644 index 00000000000..c1a6cb99fcb --- /dev/null +++ b/vendor/github.com/prometheus/client_golang/prometheus/push/push.go @@ -0,0 +1,320 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package push provides functions to push metrics to a Pushgateway. It uses a +// builder approach. Create a Pusher with New and then add the various options +// by using its methods, finally calling Add or Push, like this: +// +// // Easy case: +// push.New("http://example.org/metrics", "my_job").Gatherer(myRegistry).Push() +// +// // Complex case: +// push.New("http://example.org/metrics", "my_job"). +// Collector(myCollector1). +// Collector(myCollector2). +// Grouping("zone", "xy"). +// Client(&myHTTPClient). +// BasicAuth("top", "secret"). +// Add() +// +// See the examples section for more detailed examples. +// +// See the documentation of the Pushgateway to understand the meaning of +// the grouping key and the differences between Push and Add: +// https://github.com/prometheus/pushgateway +package push + +import ( + "bytes" + "encoding/base64" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + contentTypeHeader = "Content-Type" + // base64Suffix is appended to a label name in the request URL path to + // mark the following label value as base64 encoded. + base64Suffix = "@base64" +) + +var errJobEmpty = errors.New("job name is empty") + +// HTTPDoer is an interface for the one method of http.Client that is used by Pusher +type HTTPDoer interface { + Do(*http.Request) (*http.Response, error) +} + +// Pusher manages a push to the Pushgateway. Use New to create one, configure it +// with its methods, and finally use the Add or Push method to push. +type Pusher struct { + error error + + url, job string + grouping map[string]string + + gatherers prometheus.Gatherers + registerer prometheus.Registerer + + client HTTPDoer + useBasicAuth bool + username, password string + + expfmt expfmt.Format +} + +// New creates a new Pusher to push to the provided URL with the provided job +// name (which must not be empty). You can use just host:port or ip:port as url, +// in which case “http://” is added automatically. Alternatively, include the +// schema in the URL. However, do not include the “/metrics/jobs/…” part. +func New(url, job string) *Pusher { + var ( + reg = prometheus.NewRegistry() + err error + ) + if job == "" { + err = errJobEmpty + } + if !strings.Contains(url, "://") { + url = "http://" + url + } + if strings.HasSuffix(url, "/") { + url = url[:len(url)-1] + } + + return &Pusher{ + error: err, + url: url, + job: job, + grouping: map[string]string{}, + gatherers: prometheus.Gatherers{reg}, + registerer: reg, + client: &http.Client{}, + expfmt: expfmt.FmtProtoDelim, + } +} + +// Push collects/gathers all metrics from all Collectors and Gatherers added to +// this Pusher. Then, it pushes them to the Pushgateway configured while +// creating this Pusher, using the configured job name and any added grouping +// labels as grouping key. All previously pushed metrics with the same job and +// other grouping labels will be replaced with the metrics pushed by this +// call. (It uses HTTP method “PUT” to push to the Pushgateway.) +// +// Push returns the first error encountered by any method call (including this +// one) in the lifetime of the Pusher. +func (p *Pusher) Push() error { + return p.push(http.MethodPut) +} + +// Add works like push, but only previously pushed metrics with the same name +// (and the same job and other grouping labels) will be replaced. (It uses HTTP +// method “POST” to push to the Pushgateway.) +func (p *Pusher) Add() error { + return p.push(http.MethodPost) +} + +// Gatherer adds a Gatherer to the Pusher, from which metrics will be gathered +// to push them to the Pushgateway. The gathered metrics must not contain a job +// label of their own. +// +// For convenience, this method returns a pointer to the Pusher itself. +func (p *Pusher) Gatherer(g prometheus.Gatherer) *Pusher { + p.gatherers = append(p.gatherers, g) + return p +} + +// Collector adds a Collector to the Pusher, from which metrics will be +// collected to push them to the Pushgateway. The collected metrics must not +// contain a job label of their own. +// +// For convenience, this method returns a pointer to the Pusher itself. +func (p *Pusher) Collector(c prometheus.Collector) *Pusher { + if p.error == nil { + p.error = p.registerer.Register(c) + } + return p +} + +// Grouping adds a label pair to the grouping key of the Pusher, replacing any +// previously added label pair with the same label name. Note that setting any +// labels in the grouping key that are already contained in the metrics to push +// will lead to an error. +// +// For convenience, this method returns a pointer to the Pusher itself. +func (p *Pusher) Grouping(name, value string) *Pusher { + if p.error == nil { + if !model.LabelName(name).IsValid() { + p.error = fmt.Errorf("grouping label has invalid name: %s", name) + return p + } + p.grouping[name] = value + } + return p +} + +// Client sets a custom HTTP client for the Pusher. For convenience, this method +// returns a pointer to the Pusher itself. +// Pusher only needs one method of the custom HTTP client: Do(*http.Request). +// Thus, rather than requiring a fully fledged http.Client, +// the provided client only needs to implement the HTTPDoer interface. +// Since *http.Client naturally implements that interface, it can still be used normally. +func (p *Pusher) Client(c HTTPDoer) *Pusher { + p.client = c + return p +} + +// BasicAuth configures the Pusher to use HTTP Basic Authentication with the +// provided username and password. For convenience, this method returns a +// pointer to the Pusher itself. +func (p *Pusher) BasicAuth(username, password string) *Pusher { + p.useBasicAuth = true + p.username = username + p.password = password + return p +} + +// Format configures the Pusher to use an encoding format given by the +// provided expfmt.Format. The default format is expfmt.FmtProtoDelim and +// should be used with the standard Prometheus Pushgateway. Custom +// implementations may require different formats. For convenience, this +// method returns a pointer to the Pusher itself. +func (p *Pusher) Format(format expfmt.Format) *Pusher { + p.expfmt = format + return p +} + +// Delete sends a “DELETE” request to the Pushgateway configured while creating +// this Pusher, using the configured job name and any added grouping labels as +// grouping key. Any added Gatherers and Collectors added to this Pusher are +// ignored by this method. +// +// Delete returns the first error encountered by any method call (including this +// one) in the lifetime of the Pusher. +func (p *Pusher) Delete() error { + if p.error != nil { + return p.error + } + req, err := http.NewRequest(http.MethodDelete, p.fullURL(), nil) + if err != nil { + return err + } + if p.useBasicAuth { + req.SetBasicAuth(p.username, p.password) + } + resp, err := p.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusAccepted { + body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only. + return fmt.Errorf("unexpected status code %d while deleting %s: %s", resp.StatusCode, p.fullURL(), body) + } + return nil +} + +func (p *Pusher) push(method string) error { + if p.error != nil { + return p.error + } + mfs, err := p.gatherers.Gather() + if err != nil { + return err + } + buf := &bytes.Buffer{} + enc := expfmt.NewEncoder(buf, p.expfmt) + // Check for pre-existing grouping labels: + for _, mf := range mfs { + for _, m := range mf.GetMetric() { + for _, l := range m.GetLabel() { + if l.GetName() == "job" { + return fmt.Errorf("pushed metric %s (%s) already contains a job label", mf.GetName(), m) + } + if _, ok := p.grouping[l.GetName()]; ok { + return fmt.Errorf( + "pushed metric %s (%s) already contains grouping label %s", + mf.GetName(), m, l.GetName(), + ) + } + } + } + enc.Encode(mf) + } + req, err := http.NewRequest(method, p.fullURL(), buf) + if err != nil { + return err + } + if p.useBasicAuth { + req.SetBasicAuth(p.username, p.password) + } + req.Header.Set(contentTypeHeader, string(p.expfmt)) + resp, err := p.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + // Depending on version and configuration of the PGW, StatusOK or StatusAccepted may be returned. + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only. + return fmt.Errorf("unexpected status code %d while pushing to %s: %s", resp.StatusCode, p.fullURL(), body) + } + return nil +} + +// fullURL assembles the URL used to push/delete metrics and returns it as a +// string. The job name and any grouping label values containing a '/' will +// trigger a base64 encoding of the affected component and proper suffixing of +// the preceding component. Similarly, an empty grouping label value will be +// encoded as base64 just with a single `=` padding character (to avoid an empty +// path component). If the component does not contain a '/' but other special +// characters, the usual url.QueryEscape is used for compatibility with older +// versions of the Pushgateway and for better readability. +func (p *Pusher) fullURL() string { + urlComponents := []string{} + if encodedJob, base64 := encodeComponent(p.job); base64 { + urlComponents = append(urlComponents, "job"+base64Suffix, encodedJob) + } else { + urlComponents = append(urlComponents, "job", encodedJob) + } + for ln, lv := range p.grouping { + if encodedLV, base64 := encodeComponent(lv); base64 { + urlComponents = append(urlComponents, ln+base64Suffix, encodedLV) + } else { + urlComponents = append(urlComponents, ln, encodedLV) + } + } + return fmt.Sprintf("%s/metrics/%s", p.url, strings.Join(urlComponents, "/")) +} + +// encodeComponent encodes the provided string with base64.RawURLEncoding in +// case it contains '/' and as "=" in case it is empty. If neither is the case, +// it uses url.QueryEscape instead. It returns true in the former two cases. +func encodeComponent(s string) (string, bool) { + if s == "" { + return "=", true + } + if strings.Contains(s, "/") { + return base64.RawURLEncoding.EncodeToString([]byte(s)), true + } + return url.QueryEscape(s), false +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 75b6837e338..7142154e496 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -798,6 +798,7 @@ github.com/prometheus/client_golang/prometheus github.com/prometheus/client_golang/prometheus/collectors github.com/prometheus/client_golang/prometheus/internal github.com/prometheus/client_golang/prometheus/promhttp +github.com/prometheus/client_golang/prometheus/push github.com/prometheus/client_golang/prometheus/testutil github.com/prometheus/client_golang/prometheus/testutil/promlint # github.com/prometheus/client_model v0.2.0 From 6d89f4742aaec1ea29f088998339a116ee4b0841 Mon Sep 17 00:00:00 2001 From: panfeng Date: Fri, 18 Aug 2023 17:35:21 +0800 Subject: [PATCH 2/2] push monitor data to prometheus pushgateway --- cloud/pkg/common/monitor/monitor.go | 20 +++++++++++++++++++ common/constants/default.go | 2 ++ .../cloudcore/v1alpha1/default.go | 7 ++++++- .../cloudcore/v1alpha1/types.go | 17 +++++++++++++++- .../edgecore/v1alpha2/default.go | 6 +++--- 5 files changed, 47 insertions(+), 5 deletions(-) diff --git a/cloud/pkg/common/monitor/monitor.go b/cloud/pkg/common/monitor/monitor.go index 7f7c900c25d..af233e92fe2 100644 --- a/cloud/pkg/common/monitor/monitor.go +++ b/cloud/pkg/common/monitor/monitor.go @@ -24,7 +24,9 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/push" "github.com/prometheus/client_golang/prometheus/promhttp" + mikunode "github.com/qbox/mikud-live/common/node" "k8s.io/klog/v2" beehivecontext "github.com/kubeedge/beehive/pkg/core/context" @@ -68,6 +70,22 @@ func InstallHandlerForPProf(mux *http.ServeMux) { mux.HandleFunc("/debug/pprof/trace", pprof.Trace) } +func loopPrometheusPush(config config.MonitorServer) { + conf := config.Prometheus + if conf.IntervalS <= 0 { + conf.IntervalS = 10 + } + for range time.Tick(time.Duration(conf.IntervalS) * time.Second) { + err := push.New(conf.Server, conf.Job). + Collector(ConnectedNodes). + Grouping("instance", mikunode.GetNodeId()). + Add(); + if err != nil { + klog.Errorf("prometheus push failed:%v", err) + } + } +} + // ServeMonitor serve monitoring metric. func ServeMonitor(config config.MonitorServer) { registerMetrics() @@ -95,6 +113,8 @@ func ServeMonitor(config config.MonitorServer) { } }() + go loopPrometheusPush(config) + klog.Infof("starting monitor server on addr: %s", config.BindAddress) klog.Exit(s.ListenAndServe()) } diff --git a/common/constants/default.go b/common/constants/default.go index bbc273192d2..875d03d220a 100644 --- a/common/constants/default.go +++ b/common/constants/default.go @@ -79,6 +79,8 @@ const ( DefaultTunnelPort = 10004 DefaultClusterDomain = "cluster.local" + // monitor server + DefaultJobName = "connected_node_count" // appsd DefaultSupervisordEndpoint = "/tmp/supervisor.sock" DefaultSupervisordConfDir = "/etc/supervisord" diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go index 5191242e7fa..1ccc625a76a 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go @@ -37,8 +37,13 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { CommonConfig: &CommonConfig{ TunnelPort: constants.ServerPort, MonitorServer: MonitorServer{ - BindAddress: "127.0.0.1:9091", + BindAddress: "127.0.0.1:9001", EnableProfiling: false, + Prometheus: Prometheus{ + Server: "127.0.0.1:9091", + IntervalS: 10, + Job: constants.DefaultJobName, + }, }, }, KubeAPIConfig: &KubeAPIConfig{ diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go index 6950d30254a..4fa2422e29c 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go @@ -50,12 +50,27 @@ type CommonConfig struct { // MonitorServer indicates MonitorServer config type MonitorServer struct { // BindAddress is the IP address and port for the monitor server to serve on, - // defaulting to 127.0.0.1:9091 (set to 0.0.0.0 for all interfaces) + // defaulting to 127.0.0.1:9001 (set to 0.0.0.0 for all interfaces) BindAddress string `json:"bindAddress,omitempty"` // EnableProfiling enables profiling via web interface on /debug/pprof handler. // Profiling handlers will be handled by monitor server. EnableProfiling bool `json:"enableProfiling,omitempty"` + + // prometheus is the config for the monitor metric data to push, + Prometheus Prometheus `json:"prometheus,omitempty"` +} + +type Prometheus struct { + // Server is the IP address and port for the metric data to push, + // defaulting to 127.0.0.1:9091 (set to 0.0.0.0 for all interfaces) + Server string `json:"server,omitempty"` + // IntervalS is the time interval for pushing data to the prometheus service, + // defaulting to 10 + IntervalS int `json:"interval_s,omitempty"` + // Job is the name of push job, + // defaulting to connected_node_count + Job string `json:"job,omitempty"` } // KubeAPIConfig indicates the configuration for interacting with k8s server diff --git a/pkg/apis/componentconfig/edgecore/v1alpha2/default.go b/pkg/apis/componentconfig/edgecore/v1alpha2/default.go index 56e2cda5318..41dfad9fd51 100644 --- a/pkg/apis/componentconfig/edgecore/v1alpha2/default.go +++ b/pkg/apis/componentconfig/edgecore/v1alpha2/default.go @@ -27,12 +27,12 @@ import ( "github.com/kubeedge/kubeedge/common/constants" metaconfig "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/meta/v1alpha1" "github.com/kubeedge/kubeedge/pkg/util" - common "github.com/qbox/mikud-live/common/node" + mikunode "github.com/qbox/mikud-live/common/node" ) // NewDefaultEdgeCoreConfig returns a full EdgeCoreConfig object func NewDefaultEdgeCoreConfig() *EdgeCoreConfig { - hostnameOverride := common.GetNodeId() + hostnameOverride := mikunode.GetNodeId() localIP, _ := util.GetLocalIP(util.GetHostname()) defaultTailedKubeletConfig := TailoredKubeletConfiguration{} @@ -172,7 +172,7 @@ func NewDefaultEdgeCoreConfig() *EdgeCoreConfig { // NewMinEdgeCoreConfig returns a common EdgeCoreConfig object func NewMinEdgeCoreConfig() *EdgeCoreConfig { - hostnameOverride := common.GetNodeId() + hostnameOverride := mikunode.GetNodeId() localIP, _ := util.GetLocalIP(util.GetHostname()) defaultTailedKubeletConfig := TailoredKubeletConfiguration{}