Skip to content

Commit

Permalink
Refactor mimir.rules.kubernetes component (port from agent repo) (#158)
Browse files Browse the repository at this point in the history
* Refactor mimir.rules.kubernetes component

This change refactors the `mimir.rules.kubernetes` component to move most
mutable state into a separate `eventProcessor` struct. The existing code
already split most behavior between the main `rules.go` file and `events.go`
file. This takes the next logical step and wraps the state from `events.go`
into its own struct.

This is a prerequisite for a future PR that will add clustering support to
the `mimir.rules.kubernetes` component. In the future PR a particular agent
instance will be the "leader" for syncing rules from Mimir or not. When not
the leader, the `eventProcesor` will be a no-op.

Signed-off-by: Nick Pillitteri <[email protected]>

---------

Signed-off-by: Nick Pillitteri <[email protected]>
Co-authored-by: Nick Pillitteri <[email protected]>
  • Loading branch information
wildum and 56quarters authored Apr 22, 2024
1 parent a295c53 commit ae4b411
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 137 deletions.
36 changes: 14 additions & 22 deletions internal/component/mimir/rules/kubernetes/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,32 @@ type DebugMimirNamespace struct {

func (c *Component) DebugInfo() interface{} {
var output DebugInfo
for ns := range c.currentState {
if !isManagedMimirNamespace(c.args.MimirNameSpacePrefix, ns) {

currentState := c.eventProcessor.getMimirState()
for namespace := range currentState {
if !isManagedMimirNamespace(c.args.MimirNameSpacePrefix, namespace) {
continue
}

output.MimirRuleNamespaces = append(output.MimirRuleNamespaces, DebugMimirNamespace{
Name: ns,
NumRuleGroups: len(c.currentState[ns]),
Name: namespace,
NumRuleGroups: len(currentState[namespace]),
})
}

// This should load from the informer cache, so it shouldn't fail under normal circumstances.
managedK8sNamespaces, err := c.namespaceLister.List(c.namespaceSelector)
rulesByNamespace, err := c.eventProcessor.getKubernetesState()
if err != nil {
return DebugInfo{
Error: fmt.Sprintf("failed to list namespaces: %v", err),
}
return DebugInfo{Error: fmt.Sprintf("failed to list rules: %v", err)}
}

for _, n := range managedK8sNamespaces {
// This should load from the informer cache, so it shouldn't fail under normal circumstances.
rules, err := c.ruleLister.PrometheusRules(n.Name).List(c.ruleSelector)
if err != nil {
return DebugInfo{
Error: fmt.Sprintf("failed to list rules: %v", err),
}
}

for _, r := range rules {
for namespace, rules := range rulesByNamespace {
for _, rule := range rules {
output.PrometheusRules = append(output.PrometheusRules, DebugK8sPrometheusRule{
Namespace: n.Name,
Name: r.Name,
UID: string(r.UID),
NumRuleGroups: len(r.Spec.Groups),
Namespace: namespace,
Name: rule.Name,
UID: string(rule.UID),
NumRuleGroups: len(rule.Spec.Groups),
})
}
}
Expand Down
194 changes: 140 additions & 54 deletions internal/component/mimir/rules/kubernetes/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,108 +4,165 @@ import (
"context"
"fmt"
"regexp"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/alloy/logging/level"
"github.com/grafana/alloy/internal/component/common/kubernetes"
mimirClient "github.com/grafana/alloy/internal/mimir/client"
"github.com/hashicorp/go-multierror"
promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
promListers "github.com/prometheus-operator/prometheus-operator/pkg/client/listers/monitoring/v1"
"github.com/prometheus/prometheus/model/rulefmt"
"k8s.io/apimachinery/pkg/labels"
coreListers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/yaml" // Used for CRD compatibility instead of gopkg.in/yaml.v2
)

const eventTypeSyncMimir kubernetes.EventType = "sync-mimir"
const (
eventTypeSyncMimir kubernetes.EventType = "sync-mimir"
)

// healthReporter allows the eventProcessor to mark the owning component as healthy
// or unhealthy. This interface allows for easier testing of the eventProcessor.
type healthReporter interface {
// reportUnhealthy marks the owning component as unhealthy
reportUnhealthy(err error)
// reportHealthy marks the owning component as healthy
reportHealthy()
}

type eventProcessor struct {
queue workqueue.RateLimitingInterface
stopChan chan struct{}
health healthReporter

mimirClient mimirClient.Interface
namespaceLister coreListers.NamespaceLister
ruleLister promListers.PrometheusRuleLister
namespaceSelector labels.Selector
ruleSelector labels.Selector
namespacePrefix string

func (c *Component) eventLoop(ctx context.Context) {
metrics *metrics
logger log.Logger

currentState kubernetes.RuleGroupsByNamespace
currentStateMtx sync.RWMutex
}

// run processes events added to the queue until the queue is shutdown.
func (e *eventProcessor) run(ctx context.Context) {
for {
eventInterface, shutdown := c.queue.Get()
eventInterface, shutdown := e.queue.Get()
if shutdown {
level.Info(c.log).Log("msg", "shutting down event loop")
level.Info(e.logger).Log("msg", "shutting down event loop")
return
}

evt := eventInterface.(kubernetes.Event)
c.metrics.eventsTotal.WithLabelValues(string(evt.Typ)).Inc()
err := c.processEvent(ctx, evt)
e.metrics.eventsTotal.WithLabelValues(string(evt.Typ)).Inc()
err := e.processEvent(ctx, evt)

if err != nil {
retries := c.queue.NumRequeues(evt)
retries := e.queue.NumRequeues(evt)
if retries < 5 {
c.metrics.eventsRetried.WithLabelValues(string(evt.Typ)).Inc()
c.queue.AddRateLimited(evt)
level.Error(c.log).Log(
e.metrics.eventsRetried.WithLabelValues(string(evt.Typ)).Inc()
e.queue.AddRateLimited(evt)
level.Error(e.logger).Log(
"msg", "failed to process event, will retry",
"retries", fmt.Sprintf("%d/5", retries),
"err", err,
)
continue
} else {
c.metrics.eventsFailed.WithLabelValues(string(evt.Typ)).Inc()
level.Error(c.log).Log(
e.metrics.eventsFailed.WithLabelValues(string(evt.Typ)).Inc()
level.Error(e.logger).Log(
"msg", "failed to process event, max retries exceeded",
"retries", fmt.Sprintf("%d/5", retries),
"err", err,
)
c.reportUnhealthy(err)
e.health.reportUnhealthy(err)
}
} else {
c.reportHealthy()
e.health.reportHealthy()
}

c.queue.Forget(evt)
e.queue.Forget(evt)
}
}

func (c *Component) processEvent(ctx context.Context, e kubernetes.Event) error {
defer c.queue.Done(e)
// stop stops adding new Kubernetes events to the queue and blocks until all existing
// events have been processed by the run loop.
func (e *eventProcessor) stop() {
close(e.stopChan)
// Because this method blocks until the queue is empty, it's important that we don't
// stop the run loop and let it continue to process existing items in the queue.
e.queue.ShutDownWithDrain()
}

func (e *eventProcessor) processEvent(ctx context.Context, event kubernetes.Event) error {
defer e.queue.Done(event)

switch e.Typ {
switch event.Typ {
case kubernetes.EventTypeResourceChanged:
level.Info(c.log).Log("msg", "processing event", "type", e.Typ, "key", e.ObjectKey)
level.Info(e.logger).Log("msg", "processing event", "type", event.Typ, "key", event.ObjectKey)
case eventTypeSyncMimir:
level.Debug(c.log).Log("msg", "syncing current state from ruler")
err := c.syncMimir(ctx)
level.Debug(e.logger).Log("msg", "syncing current state from ruler")
err := e.syncMimir(ctx)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown event type: %s", e.Typ)
return fmt.Errorf("unknown event type: %s", event.Typ)
}

return c.reconcileState(ctx)
return e.reconcileState(ctx)
}

func (c *Component) syncMimir(ctx context.Context) error {
rulesByNamespace, err := c.mimirClient.ListRules(ctx, "")
func (e *eventProcessor) enqueueSyncMimir() {
e.queue.Add(kubernetes.Event{
Typ: eventTypeSyncMimir,
})
}

func (e *eventProcessor) syncMimir(ctx context.Context) error {
rulesByNamespace, err := e.mimirClient.ListRules(ctx, "")
if err != nil {
level.Error(c.log).Log("msg", "failed to list rules from mimir", "err", err)
level.Error(e.logger).Log("msg", "failed to list rules from mimir", "err", err)
return err
}

for ns := range rulesByNamespace {
if !isManagedMimirNamespace(c.args.MimirNameSpacePrefix, ns) {
if !isManagedMimirNamespace(e.namespacePrefix, ns) {
delete(rulesByNamespace, ns)
}
}

c.currentState = rulesByNamespace
e.currentStateMtx.Lock()
e.currentState = rulesByNamespace
e.currentStateMtx.Unlock()

return nil
}

func (c *Component) reconcileState(ctx context.Context) error {
func (e *eventProcessor) reconcileState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

desiredState, err := c.loadStateFromK8s()
desiredState, err := e.desiredStateFromKubernetes()
if err != nil {
return err
}

diffs := kubernetes.DiffRuleState(desiredState, c.currentState)
currentState := e.getMimirState()
diffs := kubernetes.DiffRuleState(desiredState, currentState)

var result error
for ns, diff := range diffs {
err = c.applyChanges(ctx, ns, diff)
err = e.applyChanges(ctx, ns, diff)
if err != nil {
result = multierror.Append(result, err)
continue
Expand All @@ -115,23 +172,19 @@ func (c *Component) reconcileState(ctx context.Context) error {
return result
}

func (c *Component) loadStateFromK8s() (kubernetes.RuleGroupsByNamespace, error) {
matchedNamespaces, err := c.namespaceLister.List(c.namespaceSelector)
// desiredStateFromKubernetes loads PrometheusRule resources from Kubernetes and converts
// them to corresponding Mimir rule groups, indexed by Mimir namespace.
func (e *eventProcessor) desiredStateFromKubernetes() (kubernetes.RuleGroupsByNamespace, error) {
kubernetesState, err := e.getKubernetesState()
if err != nil {
return nil, fmt.Errorf("failed to list namespaces: %w", err)
return nil, err
}

desiredState := make(kubernetes.RuleGroupsByNamespace)
for _, ns := range matchedNamespaces {
crdState, err := c.ruleLister.PrometheusRules(ns.Name).List(c.ruleSelector)
if err != nil {
return nil, fmt.Errorf("failed to list rules: %w", err)
}

for _, pr := range crdState {
mimirNs := mimirNamespaceForRuleCRD(c.args.MimirNameSpacePrefix, pr)

groups, err := convertCRDRuleGroupToRuleGroup(pr.Spec)
for _, rules := range kubernetesState {
for _, rule := range rules {
mimirNs := mimirNamespaceForRuleCRD(e.namespacePrefix, rule)
groups, err := convertCRDRuleGroupToRuleGroup(rule.Spec)
if err != nil {
return nil, fmt.Errorf("failed to convert rule group: %w", err)
}
Expand All @@ -157,38 +210,71 @@ func convertCRDRuleGroupToRuleGroup(crd promv1.PrometheusRuleSpec) ([]rulefmt.Ru
return groups.Groups, nil
}

func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []kubernetes.RuleGroupDiff) error {
func (e *eventProcessor) applyChanges(ctx context.Context, namespace string, diffs []kubernetes.RuleGroupDiff) error {
if len(diffs) == 0 {
return nil
}

for _, diff := range diffs {
switch diff.Kind {
case kubernetes.RuleGroupDiffKindAdd:
err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)
err := e.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)
if err != nil {
return err
}
level.Info(c.log).Log("msg", "added rule group", "namespace", namespace, "group", diff.Desired.Name)
level.Info(e.logger).Log("msg", "added rule group", "namespace", namespace, "group", diff.Desired.Name)
case kubernetes.RuleGroupDiffKindRemove:
err := c.mimirClient.DeleteRuleGroup(ctx, namespace, diff.Actual.Name)
err := e.mimirClient.DeleteRuleGroup(ctx, namespace, diff.Actual.Name)
if err != nil {
return err
}
level.Info(c.log).Log("msg", "removed rule group", "namespace", namespace, "group", diff.Actual.Name)
level.Info(e.logger).Log("msg", "removed rule group", "namespace", namespace, "group", diff.Actual.Name)
case kubernetes.RuleGroupDiffKindUpdate:
err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)
err := e.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)
if err != nil {
return err
}
level.Info(c.log).Log("msg", "updated rule group", "namespace", namespace, "group", diff.Desired.Name)
level.Info(e.logger).Log("msg", "updated rule group", "namespace", namespace, "group", diff.Desired.Name)
default:
level.Error(c.log).Log("msg", "unknown rule group diff kind", "kind", diff.Kind)
level.Error(e.logger).Log("msg", "unknown rule group diff kind", "kind", diff.Kind)
}
}

// resync mimir state after applying changes
return c.syncMimir(ctx)
return e.syncMimir(ctx)
}

// getMimirState returns the cached Mimir ruler state, rule groups indexed by Mimir namespace.
func (e *eventProcessor) getMimirState() kubernetes.RuleGroupsByNamespace {
e.currentStateMtx.RLock()
defer e.currentStateMtx.RUnlock()

out := make(kubernetes.RuleGroupsByNamespace, len(e.currentState))
for ns, groups := range e.currentState {
out[ns] = groups
}

return out
}

// getKubernetesState returns PrometheusRule resources indexed by Kubernetes namespace.
func (e *eventProcessor) getKubernetesState() (map[string][]*promv1.PrometheusRule, error) {
namespaces, err := e.namespaceLister.List(e.namespaceSelector)
if err != nil {
return nil, fmt.Errorf("failed to list namespaces: %w", err)
}

out := make(map[string][]*promv1.PrometheusRule)
for _, namespace := range namespaces {
rules, err := e.ruleLister.PrometheusRules(namespace.Name).List(e.ruleSelector)
if err != nil {
return nil, fmt.Errorf("failed to list rules: %w", err)
}

out[namespace.Name] = append(out[namespace.Name], rules...)
}

return out, nil
}

// mimirNamespaceForRuleCRD returns the namespace that the rule CRD should be
Expand Down
Loading

0 comments on commit ae4b411

Please sign in to comment.