From b201f8c6279a321ca65f817901fed0bb9f53b9a2 Mon Sep 17 00:00:00 2001 From: Razvan Dobre Date: Thu, 27 Jul 2023 16:39:14 +0300 Subject: [PATCH] Collect metrics for a set of CG states ADDENDUM (#8) --- minion/consumer_group_offsets.go | 10 +--------- minion/describe_consumer_groups.go | 3 ++- minion/storage.go | 4 ++-- minion/utils.go | 11 ++++++++++- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/minion/consumer_group_offsets.go b/minion/consumer_group_offsets.go index 43a105a..5ce9932 100644 --- a/minion/consumer_group_offsets.go +++ b/minion/consumer_group_offsets.go @@ -23,17 +23,9 @@ func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[ return nil, fmt.Errorf("failed to list groupsRes: %w", err) } groupIDs := make([]string, len(groupsRes.AllowedGroups.Groups)) - groupStatesMap := s.Cfg.ConsumerGroups.GetAllowedConsumerGroupStates() for i, group := range groupsRes.AllowedGroups.Groups { - if len(groupStatesMap) == 0 { - groupIDs[i] = group.Group - } else { - // only add group if it's state is allowed - if _, ok := groupStatesMap[group.GroupState]; ok { - groupIDs[i] = group.Group - } - } + groupIDs[i] = group.Group } return s.listConsumerGroupOffsetsBulk(ctx, groupIDs) diff --git a/minion/describe_consumer_groups.go b/minion/describe_consumer_groups.go index 3ebdd37..ee6e2b9 100644 --- a/minion/describe_consumer_groups.go +++ b/minion/describe_consumer_groups.go @@ -33,8 +33,9 @@ func (s *Service) listConsumerGroupsCached(ctx context.Context) (*GroupsInfo, er return nil, err } allowedGroups := make([]kmsg.ListGroupsResponseGroup, 0) + for i := range res.Groups { - if s.IsGroupAllowed(res.Groups[i].Group) { + if s.IsGroupAllowed(res.Groups[i].Group, res.Groups[i].GroupState) { allowedGroups = append(allowedGroups, res.Groups[i]) } } diff --git a/minion/storage.go b/minion/storage.go index 57036c7..10e5c55 100644 --- a/minion/storage.go +++ b/minion/storage.go @@ -108,7 +108,7 @@ func (s *Storage) getNumberOfConsumedRecords() float64 { return s.consumedRecords.Load() } -func (s *Storage) getGroupOffsets(isAllowed func(groupName string) bool) map[string]map[string]map[int32]OffsetCommit { +func (s *Storage) getGroupOffsets(isAllowed func(groupName string, groupState string) bool) map[string]map[string]map[int32]OffsetCommit { // Offsets by group, topic, partition offsetsByGroup := make(map[string]map[string]map[int32]OffsetCommit) @@ -121,7 +121,7 @@ func (s *Storage) getGroupOffsets(isAllowed func(groupName string) bool) map[str for _, offset := range offsets { val := offset.(OffsetCommit) - if !isAllowed(val.Key.Group) { + if !isAllowed(val.Key.Group, "") { continue } diff --git a/minion/utils.go b/minion/utils.go index 3049b8f..9b26001 100644 --- a/minion/utils.go +++ b/minion/utils.go @@ -6,7 +6,7 @@ import ( "strings" ) -func (s *Service) IsGroupAllowed(groupName string) bool { +func (s *Service) IsGroupAllowed(groupName string, groupState string) bool { isAllowed := false for _, regex := range s.AllowedGroupIDsExpr { if regex.MatchString(groupName) { @@ -21,6 +21,15 @@ func (s *Service) IsGroupAllowed(groupName string) bool { break } } + + if isAllowed && groupState != "" { + groupStatesMap := s.Cfg.ConsumerGroups.GetAllowedConsumerGroupStates() + if len(groupStatesMap) > 0 { + if _, ok := groupStatesMap[groupState]; !ok { + isAllowed = false + } + } + } return isAllowed }