Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
- Use snake case in river tags
- Export minimum api from packages
- Re-use existing config type for mimir client
- Implement component health
- Set defaults in Unmarshal method
- Remove exports type
- Use river blocks where appropriate
- Rename to mimir.rules.kubernetes
- Initialize with config during construction

Co-authored-by: Robert Fratto <[email protected]>
  • Loading branch information
Logiraptor and rfratto committed Dec 13, 2022
1 parent 2dee167 commit a012337
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 212 deletions.
8 changes: 4 additions & 4 deletions component/mimir/rules/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import "fmt"

type DebugInfo struct {
Error string `river:"error,attr,optional"`
PrometheusRules []DebugK8sPrometheusRule `river:"prometheusRules,attr,optional"`
MimirRuleNamespaces []DebugMimirNamespace `river:"mimirRuleNamespaces,attr,optional"`
PrometheusRules []DebugK8sPrometheusRule `river:"prometheus_rule,block,optional"`
MimirRuleNamespaces []DebugMimirNamespace `river:"mimir_rule_namespace,block,optional"`
}

type DebugK8sPrometheusRule struct {
Namespace string `river:"namespace,attr"`
Name string `river:"name,attr"`
UID string `river:"uid,attr"`
NumRuleGroups int `river:"numRuleGroups,attr"`
NumRuleGroups int `river:"num_rule_groups,attr"`
}

type DebugMimirNamespace struct {
Name string `river:"name,attr"`
NumRuleGroups int `river:"numRuleGroups,attr"`
NumRuleGroups int `river:"num_rule_groups,attr"`
}

func (c *Component) DebugInfo() interface{} {
Expand Down
32 changes: 16 additions & 16 deletions component/mimir/rules/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@ import (
"gopkg.in/yaml.v3"
)

type RuleGroupDiffKind string
type ruleGroupDiffKind string

const (
RuleGroupDiffKindAdd RuleGroupDiffKind = "add"
RuleGroupDiffKindRemove RuleGroupDiffKind = "remove"
RuleGroupDiffKindUpdate RuleGroupDiffKind = "update"
ruleGroupDiffKindAdd ruleGroupDiffKind = "add"
ruleGroupDiffKindRemove ruleGroupDiffKind = "remove"
ruleGroupDiffKindUpdate ruleGroupDiffKind = "update"
)

type RuleGroupDiff struct {
Kind RuleGroupDiffKind
type ruleGroupDiff struct {
Kind ruleGroupDiffKind
Actual mimirClient.RuleGroup
Desired mimirClient.RuleGroup
}

func diffRuleState(desired map[string][]mimirClient.RuleGroup, actual map[string][]mimirClient.RuleGroup) (map[string][]RuleGroupDiff, error) {
func diffRuleState(desired map[string][]mimirClient.RuleGroup, actual map[string][]mimirClient.RuleGroup) (map[string][]ruleGroupDiff, error) {
seen := map[string]bool{}

diff := make(map[string][]RuleGroupDiff)
diff := make(map[string][]ruleGroupDiff)

for namespace, desiredRuleGroups := range desired {
seen[namespace] = true
Expand Down Expand Up @@ -59,8 +59,8 @@ func diffRuleState(desired map[string][]mimirClient.RuleGroup, actual map[string
return diff, nil
}

func diffRuleNamespaceState(desired []mimirClient.RuleGroup, actual []mimirClient.RuleGroup) ([]RuleGroupDiff, error) {
var diff []RuleGroupDiff
func diffRuleNamespaceState(desired []mimirClient.RuleGroup, actual []mimirClient.RuleGroup) ([]ruleGroupDiff, error) {
var diff []ruleGroupDiff

seenGroups := map[string]bool{}

Expand All @@ -74,17 +74,17 @@ desiredGroups:
continue desiredGroups
}

diff = append(diff, RuleGroupDiff{
Kind: RuleGroupDiffKindUpdate,
diff = append(diff, ruleGroupDiff{
Kind: ruleGroupDiffKindUpdate,
Actual: actualRuleGroup,
Desired: desiredRuleGroup,
})
continue desiredGroups
}
}

diff = append(diff, RuleGroupDiff{
Kind: RuleGroupDiffKindAdd,
diff = append(diff, ruleGroupDiff{
Kind: ruleGroupDiffKindAdd,
Desired: desiredRuleGroup,
})
}
Expand All @@ -94,8 +94,8 @@ desiredGroups:
continue
}

diff = append(diff, RuleGroupDiff{
Kind: RuleGroupDiffKindRemove,
diff = append(diff, ruleGroupDiff{
Kind: ruleGroupDiffKindRemove,
Actual: actualRuleGroup,
})
}
Expand Down
28 changes: 14 additions & 14 deletions component/mimir/rules/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,26 @@ groups:
name string
desired map[string][]mimirClient.RuleGroup
actual map[string][]mimirClient.RuleGroup
expected map[string][]RuleGroupDiff
expected map[string][]ruleGroupDiff
}

testCases := []testCase{
{
name: "empty sets",
desired: map[string][]mimirClient.RuleGroup{},
actual: map[string][]mimirClient.RuleGroup{},
expected: map[string][]RuleGroupDiff{},
expected: map[string][]ruleGroupDiff{},
},
{
name: "add rule group",
desired: map[string][]mimirClient.RuleGroup{
managedNamespace: ruleGroupsA,
},
actual: map[string][]mimirClient.RuleGroup{},
expected: map[string][]RuleGroupDiff{
expected: map[string][]ruleGroupDiff{
managedNamespace: {
{
Kind: RuleGroupDiffKindAdd,
Kind: ruleGroupDiffKindAdd,
Desired: ruleGroupsA[0],
},
},
Expand All @@ -78,10 +78,10 @@ groups:
actual: map[string][]mimirClient.RuleGroup{
managedNamespace: ruleGroupsA,
},
expected: map[string][]RuleGroupDiff{
expected: map[string][]ruleGroupDiff{
managedNamespace: {
{
Kind: RuleGroupDiffKindRemove,
Kind: ruleGroupDiffKindRemove,
Actual: ruleGroupsA[0],
},
},
Expand All @@ -95,10 +95,10 @@ groups:
actual: map[string][]mimirClient.RuleGroup{
managedNamespace: ruleGroupsAModified,
},
expected: map[string][]RuleGroupDiff{
expected: map[string][]ruleGroupDiff{
managedNamespace: {
{
Kind: RuleGroupDiffKindUpdate,
Kind: ruleGroupDiffKindUpdate,
Desired: ruleGroupsA[0],
Actual: ruleGroupsAModified[0],
},
Expand All @@ -113,7 +113,7 @@ groups:
actual: map[string][]mimirClient.RuleGroup{
managedNamespace: ruleGroupsA,
},
expected: map[string][]RuleGroupDiff{},
expected: map[string][]ruleGroupDiff{},
},
}

Expand All @@ -126,16 +126,16 @@ groups:
}
}

func requireEqualRuleDiffs(t *testing.T, expected, actual map[string][]RuleGroupDiff) {
func requireEqualRuleDiffs(t *testing.T, expected, actual map[string][]ruleGroupDiff) {
require.Equal(t, len(expected), len(actual))

var summarizeDiff = func(diff RuleGroupDiff) string {
var summarizeDiff = func(diff ruleGroupDiff) string {
switch diff.Kind {
case RuleGroupDiffKindAdd:
case ruleGroupDiffKindAdd:
return fmt.Sprintf("add: %s", diff.Desired.Name)
case RuleGroupDiffKindRemove:
case ruleGroupDiffKindRemove:
return fmt.Sprintf("remove: %s", diff.Actual.Name)
case RuleGroupDiffKindUpdate:
case ruleGroupDiffKindUpdate:
return fmt.Sprintf("update: %s", diff.Desired.Name)
}
panic("unreachable")
Expand Down
71 changes: 40 additions & 31 deletions component/mimir/rules/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@ import (

// This type must be hashable, so it is kept simple. The indexer will maintain a
// cache of current state, so this is mostly used for logging.
type Event struct {
Type EventType
ObjectKey string
type event struct {
typ eventType
objectKey string
}

type EventType string
type eventType string

const (
EventTypeResourceChanged EventType = "resource-changed"
EventTypeSyncMimir EventType = "sync-mimir"
eventTypeResourceChanged eventType = "resource-changed"
eventTypeSyncMimir eventType = "sync-mimir"
)

// OnAdd implements the cache.ResourceEventHandler interface.
func (c *Component) OnAdd(obj interface{}) {
c.publishEvent(obj)
}

// OnUpdate implements the cache.ResourceEventHandler interface.
func (c *Component) OnUpdate(oldObj, newObj interface{}) {
c.publishEvent(newObj)
}

// OnDelete implements the cache.ResourceEventHandler interface.
func (c *Component) OnDelete(obj interface{}) {
c.publishEvent(obj)
}
Expand All @@ -48,70 +51,76 @@ func (c *Component) publishEvent(obj interface{}) {
return
}

c.queue.AddRateLimited(Event{
Type: EventTypeResourceChanged,
ObjectKey: key,
c.queue.AddRateLimited(event{
typ: eventTypeResourceChanged,
objectKey: key,
})
}

func (c *Component) eventLoop(ctx context.Context) {
for {
event, shutdown := c.queue.Get()
eventInterface, shutdown := c.queue.Get()
if shutdown {
level.Info(c.log).Log("msg", "shutting down event loop")
return
}

evt := event.(Event)
c.metrics.eventsTotal.WithLabelValues(string(evt.Type)).Inc()
evt := eventInterface.(event)
c.metrics.eventsTotal.WithLabelValues(string(evt.typ)).Inc()
err := c.processEvent(ctx, evt)

if err != nil {
retries := c.queue.NumRequeues(event)
retries := c.queue.NumRequeues(evt)
if retries < 5 {
c.metrics.eventsRetried.WithLabelValues(string(evt.Type)).Inc()
c.queue.AddRateLimited(event)
c.metrics.eventsRetried.WithLabelValues(string(evt.typ)).Inc()
c.queue.AddRateLimited(evt)
level.Error(c.log).Log(
"msg", "failed to process event, will retry",
"retries", fmt.Sprintf("%d/5", retries),
"err", err,
)
continue
} else {
c.metrics.eventsFailed.WithLabelValues(string(evt.Type)).Inc()
c.metrics.eventsFailed.WithLabelValues(string(evt.typ)).Inc()
level.Error(c.log).Log(
"msg", "failed to process event, max retries exceeded",
"retries", fmt.Sprintf("%d/5", retries),
"err", err,
)
c.reportUnhealthy(err)
}
} else {
c.reportHealthy()
}

c.queue.Forget(event)
c.queue.Forget(evt)
}
}

func (c *Component) processEvent(ctx context.Context, e Event) error {
func (c *Component) processEvent(ctx context.Context, e event) error {
defer c.queue.Done(e)

switch e.Type {
case EventTypeResourceChanged:
level.Info(c.log).Log("msg", "processing event", "type", e.Type, "key", e.ObjectKey)
case EventTypeSyncMimir:
switch e.typ {
case eventTypeResourceChanged:
level.Info(c.log).Log("msg", "processing event", "type", e.typ, "key", e.objectKey)
case eventTypeSyncMimir:
level.Debug(c.log).Log("msg", "syncing current state from ruler")
c.syncMimir(ctx)
err := c.syncMimir(ctx)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown event type: %s", e.Type)
return fmt.Errorf("unknown event type: %s", e.typ)
}

return c.reconcileState(ctx)
}

func (c *Component) syncMimir(ctx context.Context) {
func (c *Component) syncMimir(ctx context.Context) error {
rulesByNamespace, err := c.mimirClient.ListRules(ctx, "")
if err != nil {
level.Error(c.log).Log("msg", "failed to list rules from mimir", "err", err)
return
return err
}

for ns := range rulesByNamespace {
Expand All @@ -122,7 +131,7 @@ func (c *Component) syncMimir(ctx context.Context) {

c.currentState = rulesByNamespace

return
return nil
}

func (c *Component) reconcileState(ctx context.Context) error {
Expand Down Expand Up @@ -198,26 +207,26 @@ func convertCRDRuleGroupToRuleGroup(crd promv1.PrometheusRuleSpec) ([]mimirClien
return mimirGroups, nil
}

func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []RuleGroupDiff) error {
func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []ruleGroupDiff) error {
if len(diffs) == 0 {
return nil
}

for _, diff := range diffs {
switch diff.Kind {
case RuleGroupDiffKindAdd:
case ruleGroupDiffKindAdd:
err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)
if err != nil {
return err
}
level.Info(c.log).Log("msg", "added rule group", "namespace", namespace, "group", diff.Desired.Name)
case RuleGroupDiffKindRemove:
case ruleGroupDiffKindRemove:
err := c.mimirClient.DeleteRuleGroup(ctx, namespace, diff.Actual.Name)
if err != nil {
return err
}
level.Info(c.log).Log("msg", "removed rule group", "namespace", namespace, "group", diff.Actual.Name)
case RuleGroupDiffKindUpdate:
case ruleGroupDiffKindUpdate:
err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions component/mimir/rules/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func TestEventLoop(t *testing.T) {
ruleSelector: labels.Everything(),
mimirClient: newFakeMimirClient(),
args: Arguments{MimirNameSpacePrefix: "agent"},
metrics: newMetrics(),
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -133,7 +134,7 @@ func TestEventLoop(t *testing.T) {
require.Eventually(t, func() bool {
return len(handler.currentState) == 1
}, time.Second, 10*time.Millisecond)
handler.queue.AddRateLimited(Event{Type: EventTypeSyncMimir})
handler.queue.AddRateLimited(event{typ: eventTypeSyncMimir})

// Update the rule in kubernetes
rule.Spec.Groups[0].Rules = append(rule.Spec.Groups[0].Rules, v1.Rule{
Expand All @@ -148,7 +149,7 @@ func TestEventLoop(t *testing.T) {
rules := handler.currentState[mimirNamespaceForRuleCRD("agent", rule)][0].Rules
return len(rules) == 2
}, time.Second, 10*time.Millisecond)
handler.queue.AddRateLimited(Event{Type: EventTypeSyncMimir})
handler.queue.AddRateLimited(event{typ: eventTypeSyncMimir})

// Remove the rule from kubernetes
ruleIndexer.Delete(rule)
Expand Down
Loading

0 comments on commit a012337

Please sign in to comment.