diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 008c6281..326aab9f 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -7,11 +7,13 @@ import ( "github.com/icinga/icinga-kubernetes/internal" "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/database" + "github.com/icinga/icinga-kubernetes/pkg/metrics" "github.com/icinga/icinga-kubernetes/pkg/periodic" schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" "github.com/icinga/icinga-kubernetes/pkg/sync" syncv1 "github.com/icinga/icinga-kubernetes/pkg/sync/v1" k8sMysql "github.com/icinga/icinga-kubernetes/schema/mysql" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" @@ -19,6 +21,7 @@ import ( kclientcmd "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" "os" + "strconv" "strings" "time" ) @@ -53,7 +56,19 @@ func main() { factory := informers.NewSharedInformerFactory(clientset, 0) log := klog.NewKlogr() - d, err := database.FromYAMLFile(config) + cfg, err := config.FromYAMLFile[internal.Config](flags.Config) + if err != nil { + logging.Fatal(errors.Wrap(err, "can't create configuration")) + } + + promClient, err := promapi.NewClient(promapi.Config{Address: cfg.Prometheus.Host + ":" + strconv.Itoa(cfg.Prometheus.Port)}) + if err != nil { + logging.Fatal(errors.Wrap(err, "error creating promClient")) + } + + promApiClient := promv1.NewAPI(promClient) + + db, err := database.NewDbFromConfig(&cfg.Database, logs.GetChildLogger("Database")) if err != nil { klog.Fatal(err) } @@ -176,6 +191,11 @@ func main() { return s.Run(ctx) }) + g.Go(func() error { + promMetricSync := metrics.NewPromMetricSync(promApiClient, db) + + return promMetricSync.Run(ctx) + }) errs := make(chan error, 1) defer close(errs) diff --git a/config.example.yml b/config.example.yml index 66ab7926..bc271cc9 100644 --- a/config.example.yml +++ b/config.example.yml @@ -20,3 +20,12 @@ database: # Database password. password: CHANGEME + +# Configuration for Prometheus metrics API. +prometheus: + + # Prometheus host +# host: http://localhost + + # Prometheus port +# port: 9090 diff --git a/internal/config.go b/internal/config.go new file mode 100644 index 00000000..b9468d7f --- /dev/null +++ b/internal/config.go @@ -0,0 +1,25 @@ +package internal + +import ( + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-kubernetes/pkg/metrics" +) + +// Config defines Icinga Kubernetes config. +type Config struct { + Database database.Config `yaml:"database"` + Prometheus metrics.PrometheusConfig `yaml:"prometheus"` +} + +// Validate checks constraints in the supplied configuration and returns an error if they are violated. +func (c *Config) Validate() error { + if err := c.Database.Validate(); err != nil { + return err + } + + if err := c.Prometheus.Validate(); err != nil { + return err + } + + return nil +} diff --git a/pkg/metrics/config.go b/pkg/metrics/config.go new file mode 100644 index 00000000..cb664493 --- /dev/null +++ b/pkg/metrics/config.go @@ -0,0 +1,18 @@ +package metrics + +import "github.com/pkg/errors" + +// PrometheusConfig defines Prometheus configuration. +type PrometheusConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` +} + +// Validate checks constraints in the supplied Prometheus configuration and returns an error if they are violated. +func (c *PrometheusConfig) Validate() error { + if c.Host == "" { + return errors.New("Prometheus host missing") + } + + return nil +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 00000000..f914f2d3 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,574 @@ +package metrics + +import ( + "context" + "crypto/sha1" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-kubernetes/pkg/schema" + "github.com/pkg/errors" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "golang.org/x/sync/errgroup" + "time" +) + +// PromQuery defines a prometheus query with the metric group, the query and the name label +type PromQuery struct { + metricGroup string + query string + nameLabel model.LabelName +} + +// PromMetricSync synchronizes prometheus metrics from the prometheus API to the database +type PromMetricSync struct { + promApiClient v1.API + db *database.DB +} + +// NewPromMetricSync creates a new PromMetricSync +func NewPromMetricSync(promApiClient v1.API, db *database.DB) *PromMetricSync { + return &PromMetricSync{ + promApiClient: promApiClient, + db: db, + } +} + +// promMetricClusterUpsertStmt returns database upsert statement to upsert cluster metrics +func (pms *PromMetricSync) promMetricClusterUpsertStmt() string { + return fmt.Sprintf( + `INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s`, + `prometheus_cluster_metric`, + "timestamp, `group`, name, value", + `:timestamp, :group, :name, :value`, + `value=VALUES(value)`, + ) +} + +// promMetricNodeUpsertStmt returns database upsert statement to upsert node metrics +func (pms *PromMetricSync) promMetricNodeUpsertStmt() string { + return fmt.Sprintf( + `INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s`, + `prometheus_node_metric`, + "node_id, timestamp, `group`, name, value", + `:node_id, :timestamp, :group, :name, :value`, + `value=VALUES(value)`, + ) +} + +// promMetricPodUpsertStmt returns database upsert statement to upsert pod metrics +func (pms *PromMetricSync) promMetricPodUpsertStmt() string { + return fmt.Sprintf( + `INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s`, + `prometheus_pod_metric`, + "pod_id, timestamp, `group`, name, value", + `:pod_id, :timestamp, :group, :name, :value`, + `value=VALUES(value)`, + ) +} + +// promMetricContainerUpsertStmt returns database upsert statement to upsert container metrics +func (pms *PromMetricSync) promMetricContainerUpsertStmt() string { + return fmt.Sprintf( + `INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s`, + `prometheus_container_metric`, + "container_id, timestamp, `group`, name, value", + `:container_id, :timestamp, :group, :name, :value`, + `value=VALUES(value)`, + ) +} + +// Run starts syncing the prometheus metrics to the database. +// Therefore, it gets a list of the metric queries. +func (pms *PromMetricSync) Run(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + upsertClusterMetrics := make(chan database.Entity) + upsertNodeMetrics := make(chan database.Entity) + upsertPodMetrics := make(chan database.Entity) + upsertContainerMetrics := make(chan database.Entity) + + promQueriesCluster := []PromQuery{ + { + "node.count", + `count(group by (node) (kube_node_info))`, + "", + }, + { + "namespace.count", + `count(kube_namespace_created)`, + "", + }, + { + "pod.running", + `sum(kube_pod_status_phase{phase="Running"})`, + "", + }, + { + "pod.pending", + `sum(kube_pod_status_phase{phase="Pending"})`, + "", + }, + { + "pod.failed", + `sum(kube_pod_status_phase{phase="Failed"})`, + "", + }, + { + "pod.succeeded", + `sum(kube_pod_status_phase{phase="Succeeded"})`, + "", + }, + { + "cpu.usage", + `avg(sum by (instance, cpu) (rate(node_cpu_seconds_total{mode!~"idle|iowait|steal"}[1m])))`, + "", + }, + { + "memory.usage", + `sum(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / sum(node_memory_MemTotal_bytes)`, + "", + }, + { + "qos_by_class", + `sum by (qos_class) (kube_pod_status_qos_class)`, + "", + }, + { + "network.received.bytes", + `sum by (device) (rate(node_network_receive_bytes_total{device!~"(veth|azv|lxc).*"}[2m]))`, + "", + }, + { + "network.transmitted.bytes", + `- sum by (device) (rate(node_network_transmit_bytes_total{device!~"(veth|azv|lxc).*"}[2m]))`, + "", + }, + { + "network.received.bytes.bydevice", + `sum by (device) (rate(node_network_receive_bytes_total{device!~"(veth|azv|lxc).*"}[2m]))`, + "device", + }, + } + + promQueriesNode := []PromQuery{ + { + "cpu.usage", + `avg by (instance) (sum by (instance, cpu) (rate(node_cpu_seconds_total{mode!~"idle|iowait|steal"}[1m])))`, + "", + }, + { + "cpu.request", + `sum by (node) (kube_pod_container_resource_requests{resource="cpu"})`, + "", + }, + { + "cpu.request.percentage", + `sum by (node) (kube_pod_container_resource_requests{resource="cpu"}) / on(node) group_left() (sum by (node) (machine_cpu_cores))`, + "", + }, + { + "cpu.limit", + `sum by (node) (kube_pod_container_resource_limits{resource="cpu"})`, + "", + }, + { + "cpu.limit.percentage", + `sum by (node) (kube_pod_container_resource_limits{resource="cpu"}) / on(node) group_left() (sum by (node) (machine_cpu_cores))`, + "", + }, + { + "memory.usage", + `sum by (instance) (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / sum by (instance) (node_memory_MemTotal_bytes)`, + "", + }, + { + "memory.request", + `sum by (node) (kube_pod_container_resource_requests{resource="memory"})`, + "", + }, + { + "memory.request.percentage", + `sum by (node) (kube_pod_container_resource_requests{resource="memory"}) / on(node) group_left() (sum by (node) (machine_memory_bytes))`, + "", + }, + { + "memory.limit", + `sum by (node) (kube_pod_container_resource_limits{resource="memory"})`, + "", + }, + { + "memory.limit.percentage", + `sum by (node) (kube_pod_container_resource_limits{resource="memory"}) / on(node) group_left() (sum by (node) (machine_memory_bytes))`, + "", + }, + { + "network.received.bytes", + `sum by (instance) (rate(node_network_receive_bytes_total[2m]))`, + "", + }, + { + "network.transmitted.bytes", + `- sum by (instance) (rate(node_network_transmit_bytes_total[2m]))`, + "", + }, + { + "filesystem.usage", + `sum by (instance, mountpoint) (1 - (node_filesystem_avail_bytes / node_filesystem_size_bytes))`, + "mountpoint", + }, + } + + promQueriesPod := []PromQuery{ + { + "cpu.usage", + `sum by (node, namespace, pod) (rate(container_cpu_usage_seconds_total[1m]))`, + "", + }, + { + "memory.usage", + `sum by (node, namespace, pod) (container_memory_usage_bytes) / on (node) group_left(instance) label_replace(node_memory_MemTotal_bytes, "node", "$1", "instance", "(.*)")`, + "", + }, + { + "cpu.usage.cores", + `sum by (namespace, pod) (rate(container_cpu_usage_seconds_total[1m]))`, + "", + }, + { + "memory.usage.bytes", + `sum by (namespace, pod) (container_memory_usage_bytes)`, + "", + }, + { + "cpu.request", + `sum by (node, namespace, pod) (kube_pod_container_resource_requests{resource="cpu"})`, + "", + }, + { + "cpu.request.percentage", + `sum by (node, namespace, pod) (kube_pod_container_resource_requests{resource="cpu"}) / on(node) group_left() (sum by (node) (machine_cpu_cores))`, + "", + }, + { + "cpu.limit", + `sum by (node, namespace, pod) (kube_pod_container_resource_limits{resource="cpu"})`, + "", + }, + { + "cpu.limit.percentage", + `sum by (node, namespace, pod) (kube_pod_container_resource_limits{resource="cpu"}) / on(node) group_left() (sum by (node) (machine_cpu_cores))`, + "", + }, + { + "memory.request", + `sum by (node, namespace, pod) (kube_pod_container_resource_requests{resource="memory"})`, + "", + }, + { + "memory.request.percentage", + `sum by (node, namespace, pod) (kube_pod_container_resource_requests{resource="memory"}) / on(node) group_left() (sum by (node) (machine_memory_bytes))`, + "", + }, + { + "memory.limit", + `sum by (node, namespace, pod) (kube_pod_container_resource_limits{resource="memory"})`, + "", + }, + { + "memory.limit.percentage", + `sum by (node, namespace, pod) (kube_pod_container_resource_limits{resource="memory"}) / on(node) group_left() (sum by (node) (machine_memory_bytes))`, + "", + }, + } + + promQueriesContainer := []PromQuery{ + { + "cpu.request", + `sum by (node, namespace, pod, container) (kube_pod_container_resource_requests{resource="cpu"})`, + "", + }, + { + "cpu.request.percentage", + `sum by (node, namespace, pod, container) (kube_pod_container_resource_requests{resource="cpu"}) / on(node) group_left() (sum by (node) (machine_cpu_cores))`, + "", + }, + { + "cpu.limit", + `sum by (node, namespace, pod, container) (kube_pod_container_resource_limits{resource="cpu"})`, + "", + }, + { + "cpu.limit.percentage", + `sum by (node, namespace, pod, container) (kube_pod_container_resource_limits{resource="cpu"}) / on(node) group_left() (sum by (node) (machine_cpu_cores))`, + "", + }, + { + "memory.request", + `sum by (node, namespace, pod, container) (kube_pod_container_resource_requests{resource="memory"})`, + "", + }, + { + "memory.request.percentage", + `sum by (node, namespace, pod, container) (kube_pod_container_resource_requests{resource="memory"}) / on(node) group_left() (sum by (node) (machine_memory_bytes))`, + "", + }, + { + "memory.limit", + `sum by (node, namespace, pod, container) (kube_pod_container_resource_limits{resource="memory"})`, + "", + }, + { + "memory.limit.percentage", + `sum by (node, namespace, pod, container) (kube_pod_container_resource_limits{resource="memory"}) / on(node) group_left() (sum by (node) (machine_memory_bytes))`, + "", + }, + } + + for _, promQuery := range promQueriesCluster { + promQuery := promQuery + + g.Go(func() error { + for { + result, warnings, err := pms.promApiClient.Query( + ctx, + promQuery.query, + time.Time{}, + //promQuery.queryRange, + ) + if err != nil { + return errors.Wrap(err, "error querying Prometheus") + } + if len(warnings) > 0 { + fmt.Printf("Warnings: %v\n", warnings) + } + if result == nil { + fmt.Println("No results found") + continue + } + + for _, res := range result.(model.Vector) { + + name := "" + + if promQuery.nameLabel != "" { + name = string(res.Metric[promQuery.nameLabel]) + } + + newClusterMetric := &schema.PrometheusClusterMetric{ + Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000, + Group: promQuery.metricGroup, + Name: name, + Value: float64(res.Value), + } + + select { + case upsertClusterMetrics <- newClusterMetric: + case <-ctx.Done(): + return ctx.Err() + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 55): + } + } + }) + } + + for _, promQuery := range promQueriesNode { + promQuery := promQuery + + g.Go(func() error { + for { + result, warnings, err := pms.promApiClient.Query( + ctx, + promQuery.query, + time.Time{}, + //promQuery.queryRange, + ) + if err != nil { + return errors.Wrap(err, "error querying Prometheus") + } + if len(warnings) > 0 { + fmt.Printf("Warnings: %v\n", warnings) + } + if result == nil { + fmt.Println("No results found") + continue + } + + for _, res := range result.(model.Vector) { + nodeName := res.Metric["node"] + + if nodeName == "" { + nodeName = res.Metric["instance"] + } + + nodeId := sha1.Sum([]byte(nodeName)) + + name := "" + + if promQuery.nameLabel != "" { + name = string(res.Metric[promQuery.nameLabel]) + } + + newNodeMetric := &schema.PrometheusNodeMetric{ + NodeId: nodeId[:], + Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000, + Group: promQuery.metricGroup, + Name: name, + Value: float64(res.Value), + } + + select { + case upsertNodeMetrics <- newNodeMetric: + case <-ctx.Done(): + return ctx.Err() + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 55): + } + } + }) + } + + for _, promQuery := range promQueriesPod { + promQuery := promQuery + + g.Go(func() error { + for { + result, warnings, err := pms.promApiClient.Query( + ctx, + promQuery.query, + time.Time{}, + //promQuery.queryRange, + ) + if err != nil { + return errors.Wrap(err, "error querying Prometheus") + } + if len(warnings) > 0 { + fmt.Printf("Warnings: %v\n", warnings) + } + if result == nil { + fmt.Println("No results found") + continue + } + + for _, res := range result.(model.Vector) { + + if res.Metric["pod"] == "" { + continue + } + + podId := sha1.Sum([]byte(res.Metric["namespace"] + "/" + res.Metric["pod"])) + + name := "" + + if promQuery.nameLabel != "" { + name = string(res.Metric[promQuery.nameLabel]) + } + + newPodMetric := &schema.PrometheusPodMetric{ + PodId: podId[:], + Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000, + Group: promQuery.metricGroup, + Name: name, + Value: float64(res.Value), + } + + select { + case upsertPodMetrics <- newPodMetric: + case <-ctx.Done(): + return ctx.Err() + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 55): + } + } + }) + } + + for _, promQuery := range promQueriesContainer { + promQuery := promQuery + + g.Go(func() error { + for { + result, warnings, err := pms.promApiClient.Query( + ctx, + promQuery.query, + time.Time{}, + //promQuery.queryRange, + ) + if err != nil { + return errors.Wrap(err, "error querying Prometheus") + } + if len(warnings) > 0 { + fmt.Printf("Warnings: %v\n", warnings) + } + if result == nil { + fmt.Println("No results found") + continue + } + + for _, res := range result.(model.Vector) { + containerId := sha1.Sum([]byte(res.Metric["namespace"] + "/" + res.Metric["pod"] + "/" + res.Metric["container"])) + + name := "" + + if promQuery.nameLabel != "" { + name = string(res.Metric[promQuery.nameLabel]) + } + + newContainerMetric := &schema.PrometheusContainerMetric{ + ContainerId: containerId[:], + Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000, + Group: promQuery.metricGroup, + Name: name, + Value: float64(res.Value), + } + + select { + case upsertContainerMetrics <- newContainerMetric: + case <-ctx.Done(): + return ctx.Err() + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 55): + } + } + }) + } + + g.Go(func() error { + return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricClusterUpsertStmt(), 4)).Stream(ctx, upsertClusterMetrics) + }) + + g.Go(func() error { + return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricNodeUpsertStmt(), 5)).Stream(ctx, upsertNodeMetrics) + }) + + g.Go(func() error { + return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricPodUpsertStmt(), 5)).Stream(ctx, upsertPodMetrics) + }) + + g.Go(func() error { + return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricContainerUpsertStmt(), 5)).Stream(ctx, upsertContainerMetrics) + }) + + return g.Wait() +} diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index 9f8c0229..7b3c0f58 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -723,3 +723,42 @@ CREATE TABLE kubernetes_schema ( INSERT INTO kubernetes_schema (version, timestamp, success, reason) VALUES ('0.1.0', UNIX_TIMESTAMP() * 1000, 'y', 'Initial import'); + +CREATE TABLE prometheus_cluster_metric +( + timestamp bigint NOT NULL, + category varchar(255) NOT NULL, + name varchar(255) NOT NULL, + value double NOT NULL, + PRIMARY KEY (timestamp, category, name) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; + +CREATE TABLE prometheus_node_metric +( + node_id binary(20) NOT NULL, + timestamp bigint NOT NULL, + category varchar(255) NOT NULL, + name varchar(255) NOT NULL, + value double NOT NULL, + PRIMARY KEY (node_id, timestamp, category, name) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; + +CREATE TABLE prometheus_pod_metric +( + pod_id binary(20) NOT NULL, + timestamp bigint NOT NULL, + category varchar(255) NOT NULL, + name varchar(255) NOT NULL, + value double NOT NULL, + PRIMARY KEY (pod_id, timestamp, category, name) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; + +CREATE TABLE prometheus_container_metric +( + container_id binary(20) NOT NULL, + timestamp bigint NOT NULL, + category varchar(255) NOT NULL, + name varchar(255) NOT NULL, + value double NOT NULL, + PRIMARY KEY (container_id, timestamp, category, name) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;