From c635b0ba8b01782410d6c0b7f62fd8d61c81f81d Mon Sep 17 00:00:00 2001 From: Adi Muraru Date: Mon, 20 Sep 2021 14:49:13 +0300 Subject: [PATCH] [INTERNAL] Export numerical kafka_topic_info parameters/labels as individual metrics Export as numerical metrics: - topic_info_partitions_count - topic_info_replication_factor - topic_info_min_insync_replicas - topic_info_retention_ms Partial fix for #116 --- prometheus/collect_topic_info.go | 32 +++++++++++++++++++++++++++ prometheus/exporter.go | 38 +++++++++++++++++++++++++++----- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/prometheus/collect_topic_info.go b/prometheus/collect_topic_info.go index 7474ec1..9ece4c4 100644 --- a/prometheus/collect_topic_info.go +++ b/prometheus/collect_topic_info.go @@ -79,6 +79,38 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me float64(1), labelsValues..., ) + ch <- prometheus.MustNewConstMetric( + e.topicInfoPartitionsCount, + prometheus.GaugeValue, + float64(partitionCount), + *topic.Topic, + ) + ch <- prometheus.MustNewConstMetric( + e.topicInfoReplicationFactor, + prometheus.GaugeValue, + float64(replicationFactor), + *topic.Topic, + ) + if parameter, exists := configsByTopic[*topic.Topic]["min.insync.replicas"]; exists { + if value, err := strconv.ParseFloat(parameter, 64); err == nil { + ch <- prometheus.MustNewConstMetric( + e.topicInfoMinInsyncReplicas, + prometheus.GaugeValue, + value, + *topic.Topic, + ) + } + } + if parameter, exists := configsByTopic[*topic.Topic]["retention.ms"]; exists { + if value, err := strconv.ParseFloat(parameter, 64); err == nil { + ch <- prometheus.MustNewConstMetric( + e.topicInfoRetentionMs, + prometheus.GaugeValue, + value, + *topic.Topic, + ) + } + } } return isOk } diff --git a/prometheus/exporter.go b/prometheus/exporter.go index d717bcf..cafa603 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -32,11 +32,15 @@ type Exporter struct { topicLogDirSize *prometheus.Desc // Topic / Partition - topicInfo *prometheus.Desc - topicHighWaterMarkSum *prometheus.Desc - partitionHighWaterMark *prometheus.Desc - topicLowWaterMarkSum *prometheus.Desc - partitionLowWaterMark *prometheus.Desc + topicInfo *prometheus.Desc + topicInfoPartitionsCount *prometheus.Desc + topicInfoReplicationFactor *prometheus.Desc + topicInfoMinInsyncReplicas *prometheus.Desc + topicInfoRetentionMs *prometheus.Desc + topicHighWaterMarkSum *prometheus.Desc + partitionHighWaterMark *prometheus.Desc + topicLowWaterMarkSum *prometheus.Desc + partitionLowWaterMark *prometheus.Desc // Consumer Groups consumerGroupInfo *prometheus.Desc @@ -114,6 +118,30 @@ func (e *Exporter) InitializeMetrics() { labels, nil, ) + e.topicInfoPartitionsCount = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_partitions_count"), + "Partitions configuration for a given topic", + []string{"topic_name"}, + nil, + ) + e.topicInfoReplicationFactor = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_replication_factor"), + "Replication factor configuration for a given topic", + []string{"topic_name"}, + nil, + ) + e.topicInfoMinInsyncReplicas = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_min_insync_replicas"), + "Min in-sync replicas configuration for a given topic", + []string{"topic_name"}, + nil, + ) + e.topicInfoRetentionMs = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_retention_ms"), + "Retention time for a given topic", + []string{"topic_name"}, + nil, + ) // Partition Low Water Mark e.partitionLowWaterMark = prometheus.NewDesc( prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_low_water_mark"),