Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter on topic name for metric kafka_topic_log_dir_size_total_bytes #237

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions minion/config_log_dirs.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
package minion

import (
"fmt"
)

type LogDirsConfig struct {
// Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior
// to version 1.0.0 as describing log dirs was not supported back then.
Enabled bool `koanf:"enabled"`
// AllowedTopics are regex strings of topic names whose topic metrics that shall be exported.
AllowedTopics []string `koanf:"allowedTopics"`

// IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
// take precedence over allowed topics.
IgnoredTopics []string `koanf:"ignoredTopics"`
}

// Validate if provided LogDirsConfig is valid.
func (c *LogDirsConfig) Validate() error {
// Check whether each provided string is valid regex
for _, topic := range c.AllowedTopics {
_, err := compileRegex(topic)
if err != nil {
return fmt.Errorf("allowed topic string '%v' is not valid regex", topic)
}
}

for _, topic := range c.IgnoredTopics {
_, err := compileRegex(topic)
if err != nil {
return fmt.Errorf("ignored topic string '%v' is not valid regex", topic)
}
}
return nil
}

// SetDefaults for topic config
func (c *LogDirsConfig) SetDefaults() {
c.Enabled = true
c.AllowedTopics = []string{"/.*/"}
}
14 changes: 10 additions & 4 deletions minion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ type Service struct {
cache map[string]interface{}
cacheLock sync.RWMutex

AllowedGroupIDsExpr []*regexp.Regexp
IgnoredGroupIDsExpr []*regexp.Regexp
AllowedTopicsExpr []*regexp.Regexp
IgnoredTopicsExpr []*regexp.Regexp
AllowedGroupIDsExpr []*regexp.Regexp
IgnoredGroupIDsExpr []*regexp.Regexp
AllowedTopicsExpr []*regexp.Regexp
IgnoredTopicsExpr []*regexp.Regexp
AllowedlogDirsTopicsExpr []*regexp.Regexp
IgnoredlogDirsTopicsExpr []*regexp.Regexp

client *kgo.Client
storage *Storage
Expand Down Expand Up @@ -67,6 +69,8 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics
ignoredGroupIDsExpr, _ := compileRegexes(cfg.ConsumerGroups.IgnoredGroupIDs)
allowedTopicsExpr, _ := compileRegexes(cfg.Topics.AllowedTopics)
ignoredTopicsExpr, _ := compileRegexes(cfg.Topics.IgnoredTopics)
allowedlogDirsTopicsExpr, _ := compileRegexes(cfg.LogDirs.AllowedTopics)
ignoredlogDirsTopicsExpr, _ := compileRegexes(cfg.LogDirs.IgnoredTopics)

service := &Service{
Cfg: cfg,
Expand All @@ -80,6 +84,8 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics
IgnoredGroupIDsExpr: ignoredGroupIDsExpr,
AllowedTopicsExpr: allowedTopicsExpr,
IgnoredTopicsExpr: ignoredTopicsExpr,
AllowedlogDirsTopicsExpr: allowedlogDirsTopicsExpr,
IgnoredlogDirsTopicsExpr: ignoredlogDirsTopicsExpr,

client: client,
storage: storage,
Expand Down
18 changes: 18 additions & 0 deletions minion/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@ func (s *Service) IsTopicAllowed(topicName string) bool {
return isAllowed
}

func (s *Service) IsLOgDirsTopicAllowed(topicName string) bool {
isAllowed := false
for _, regex := range s.AllowedlogDirsTopicsExpr {
if regex.MatchString(topicName) {
isAllowed = true
break
}
}

for _, regex := range s.IgnoredlogDirsTopicsExpr {
if regex.MatchString(topicName) {
isAllowed = false
break
}
}
return isAllowed
}

func compileRegex(expr string) (*regexp.Regexp, error) {
if strings.HasPrefix(expr, "/") && strings.HasSuffix(expr, "/") {
substr := expr[1 : len(expr)-1]
Expand Down
3 changes: 3 additions & 0 deletions prometheus/collect_log_dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (e *Exporter) collectLogDirs(ctx context.Context, ch chan<- prometheus.Metr
continue
}
for _, topic := range dir.Topics {
if !e.minionSvc.IsLOgDirsTopicAllowed(topic.Topic) {
continue
}
topicSize := int64(0)
for _, partition := range topic.Partitions {
topicSize += partition.Size
Expand Down