diff --git a/prometheus/collect_topic_info.go b/prometheus/collect_topic_info.go index b740378..5f8005c 100644 --- a/prometheus/collect_topic_info.go +++ b/prometheus/collect_topic_info.go @@ -70,7 +70,7 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me labelsValues = append(labelsValues, strconv.Itoa(partitionCount)) labelsValues = append(labelsValues, strconv.Itoa(replicationFactor)) for _, key := range e.minionSvc.Cfg.Topics.InfoMetric.ConfigKeys { - labelsValues = append(labelsValues, getOrDefault(configsByTopic[topic.Topic], key, "N/A")) + labelsValues = append(labelsValues, getOrDefault(configsByTopic[topic.Topic], key, "N/A")) } ch <- prometheus.MustNewConstMetric( e.topicInfo, @@ -78,6 +78,39 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me float64(1), labelsValues..., ) + ch <- prometheus.MustNewConstMetric( + e.topicInfoPartitionsCount, + prometheus.GaugeValue, + float64(partitionCount), + topic.Topic, + ) + ch <- prometheus.MustNewConstMetric( + e.topicInfoReplicationFactor, + prometheus.GaugeValue, + float64(replicationFactor), + topic.Topic, + ) + if parameter, exists := configsByTopic[topic.Topic]["min.insync.replicas"]; exists { + if value, err := strconv.ParseFloat(parameter, 64); err == nil { + ch <- prometheus.MustNewConstMetric( + e.topicInfoMinInsyncReplicas, + prometheus.GaugeValue, + value, + topic.Topic, + ) + } + } + if parameter, exists := configsByTopic[topic.Topic]["retention.ms"]; exists { + if value, err := strconv.ParseFloat(parameter, 64); err == nil { + ch <- prometheus.MustNewConstMetric( + e.topicInfoRetentionMs, + prometheus.GaugeValue, + value, + topic.Topic, + ) + } + } + } return isOk } diff --git a/prometheus/exporter.go b/prometheus/exporter.go index d717bcf..cafa603 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -32,11 +32,15 @@ type Exporter struct { topicLogDirSize *prometheus.Desc // Topic / Partition - topicInfo *prometheus.Desc - topicHighWaterMarkSum *prometheus.Desc - partitionHighWaterMark *prometheus.Desc - topicLowWaterMarkSum *prometheus.Desc - partitionLowWaterMark *prometheus.Desc + topicInfo *prometheus.Desc + topicInfoPartitionsCount *prometheus.Desc + topicInfoReplicationFactor *prometheus.Desc + topicInfoMinInsyncReplicas *prometheus.Desc + topicInfoRetentionMs *prometheus.Desc + topicHighWaterMarkSum *prometheus.Desc + partitionHighWaterMark *prometheus.Desc + topicLowWaterMarkSum *prometheus.Desc + partitionLowWaterMark *prometheus.Desc // Consumer Groups consumerGroupInfo *prometheus.Desc @@ -114,6 +118,30 @@ func (e *Exporter) InitializeMetrics() { labels, nil, ) + e.topicInfoPartitionsCount = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_partitions_count"), + "Partitions configuration for a given topic", + []string{"topic_name"}, + nil, + ) + e.topicInfoReplicationFactor = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_replication_factor"), + "Replication factor configuration for a given topic", + []string{"topic_name"}, + nil, + ) + e.topicInfoMinInsyncReplicas = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_min_insync_replicas"), + "Min in-sync replicas configuration for a given topic", + []string{"topic_name"}, + nil, + ) + e.topicInfoRetentionMs = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_retention_ms"), + "Retention time for a given topic", + []string{"topic_name"}, + nil, + ) // Partition Low Water Mark e.partitionLowWaterMark = prometheus.NewDesc( prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_low_water_mark"),