Skip to content

Commit

Permalink
Loki crds (#3936)
Browse files Browse the repository at this point in the history
* init

* init

* Updated documentation, river validator and defaulter interfaces.

* Fix wrong copy/paste

Signed-off-by: Paschalis Tsilias <[email protected]>

* Remove references to weaveworks repos; add new client and implement updated interface

Signed-off-by: Paschalis Tsilias <[email protected]>

* Fix test

Signed-off-by: Paschalis Tsilias <[email protected]>

* Fix river import path

Signed-off-by: Paschalis Tsilias <[email protected]>

* Apply docs suggestions from code review

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/flow/reference/components/loki.rules.kubernetes.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/flow/reference/components/loki.rules.kubernetes.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/flow/reference/components/loki.rules.kubernetes.md

Co-authored-by: Clayton Cornell <[email protected]>

* use flow logging level instead of go-kit logging level

* add startup backoff to sync with mimir

* fix error msg in test

* update changelog

* add stability featuregate

---------

Signed-off-by: Paschalis Tsilias <[email protected]>
Co-authored-by: Paschalis Tsilias <[email protected]>
Co-authored-by: Paschalis Tsilias <[email protected]>
Co-authored-by: Clayton Cornell <[email protected]>
Co-authored-by: William Dumont <[email protected]>
  • Loading branch information
5 people authored Feb 28, 2024
1 parent 62c1c85 commit 078149d
Show file tree
Hide file tree
Showing 18 changed files with 2,050 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Main (unreleased)
- Added a new CLI flag `--stability.level` which defines the minimum stability
level required for the features that the agent is allowed to use. Default is `experimental`. (@thampiotr)

- A new `loki.rules.kubernetes` component that discovers `PrometheusRule` Kubernetes resources and loads them into a Loki Ruler instance. (@EStork09)

v0.40.0 (2024-02-27)
--------------------
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
_ "github.com/grafana/agent/component/loki/echo" // Import loki.echo
_ "github.com/grafana/agent/component/loki/process" // Import loki.process
_ "github.com/grafana/agent/component/loki/relabel" // Import loki.relabel
_ "github.com/grafana/agent/component/loki/rules/kubernetes" // Import loki.rules.kubernetes
_ "github.com/grafana/agent/component/loki/source/api" // Import loki.source.api
_ "github.com/grafana/agent/component/loki/source/aws_firehose" // Import loki.source.awsfirehose
_ "github.com/grafana/agent/component/loki/source/azure_event_hubs" // Import loki.source.azure_event_hubs
Expand Down
64 changes: 64 additions & 0 deletions component/loki/rules/kubernetes/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package rules

import "fmt"

type DebugInfo struct {
Error string `river:"error,attr,optional"`
PrometheusRules []DebugK8sPrometheusRule `river:"prometheus_rule,block,optional"`
LokiRuleNamespaces []DebugLokiNamespace `river:"loki_rule_namespace,block,optional"`
}

type DebugK8sPrometheusRule struct {
Namespace string `river:"namespace,attr"`
Name string `river:"name,attr"`
UID string `river:"uid,attr"`
NumRuleGroups int `river:"num_rule_groups,attr"`
}

type DebugLokiNamespace struct {
Name string `river:"name,attr"`
NumRuleGroups int `river:"num_rule_groups,attr"`
}

func (c *Component) DebugInfo() interface{} {
var output DebugInfo
for ns := range c.currentState {
if !isManagedLokiNamespace(c.args.LokiNameSpacePrefix, ns) {
continue
}

output.LokiRuleNamespaces = append(output.LokiRuleNamespaces, DebugLokiNamespace{
Name: ns,
NumRuleGroups: len(c.currentState[ns]),
})
}

// This should load from the informer cache, so it shouldn't fail under normal circumstances.
managedK8sNamespaces, err := c.namespaceLister.List(c.namespaceSelector)
if err != nil {
return DebugInfo{
Error: fmt.Sprintf("failed to list namespaces: %v", err),
}
}

for _, n := range managedK8sNamespaces {
// This should load from the informer cache, so it shouldn't fail under normal circumstances.
rules, err := c.ruleLister.PrometheusRules(n.Name).List(c.ruleSelector)
if err != nil {
return DebugInfo{
Error: fmt.Sprintf("failed to list rules: %v", err),
}
}

for _, r := range rules {
output.PrometheusRules = append(output.PrometheusRules, DebugK8sPrometheusRule{
Namespace: n.Name,
Name: r.Name,
UID: string(r.UID),
NumRuleGroups: len(r.Spec.Groups),
})
}
}

return output
}
113 changes: 113 additions & 0 deletions component/loki/rules/kubernetes/diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package rules

import (
"bytes"

"github.com/prometheus/prometheus/model/rulefmt"
"gopkg.in/yaml.v3" // Used for prometheus rulefmt compatibility instead of gopkg.in/yaml.v2
)

type ruleGroupDiffKind string

const (
ruleGroupDiffKindAdd ruleGroupDiffKind = "add"
ruleGroupDiffKindRemove ruleGroupDiffKind = "remove"
ruleGroupDiffKindUpdate ruleGroupDiffKind = "update"
)

type ruleGroupDiff struct {
Kind ruleGroupDiffKind
Actual rulefmt.RuleGroup
Desired rulefmt.RuleGroup
}

type ruleGroupsByNamespace map[string][]rulefmt.RuleGroup
type ruleGroupDiffsByNamespace map[string][]ruleGroupDiff

func diffRuleState(desired, actual ruleGroupsByNamespace) ruleGroupDiffsByNamespace {
seenNamespaces := map[string]bool{}

diff := make(ruleGroupDiffsByNamespace)

for namespace, desiredRuleGroups := range desired {
seenNamespaces[namespace] = true

actualRuleGroups := actual[namespace]
subDiff := diffRuleNamespaceState(desiredRuleGroups, actualRuleGroups)

if len(subDiff) == 0 {
continue
}

diff[namespace] = subDiff
}

for namespace, actualRuleGroups := range actual {
if seenNamespaces[namespace] {
continue
}

subDiff := diffRuleNamespaceState(nil, actualRuleGroups)

diff[namespace] = subDiff
}

return diff
}

func diffRuleNamespaceState(desired []rulefmt.RuleGroup, actual []rulefmt.RuleGroup) []ruleGroupDiff {
var diff []ruleGroupDiff

seenGroups := map[string]bool{}

desiredGroups:
for _, desiredRuleGroup := range desired {
seenGroups[desiredRuleGroup.Name] = true

for _, actualRuleGroup := range actual {
if desiredRuleGroup.Name == actualRuleGroup.Name {
if equalRuleGroups(desiredRuleGroup, actualRuleGroup) {
continue desiredGroups
}

diff = append(diff, ruleGroupDiff{
Kind: ruleGroupDiffKindUpdate,
Actual: actualRuleGroup,
Desired: desiredRuleGroup,
})
continue desiredGroups
}
}

diff = append(diff, ruleGroupDiff{
Kind: ruleGroupDiffKindAdd,
Desired: desiredRuleGroup,
})
}

for _, actualRuleGroup := range actual {
if seenGroups[actualRuleGroup.Name] {
continue
}

diff = append(diff, ruleGroupDiff{
Kind: ruleGroupDiffKindRemove,
Actual: actualRuleGroup,
})
}

return diff
}

func equalRuleGroups(a, b rulefmt.RuleGroup) bool {
aBuf, err := yaml.Marshal(a)
if err != nil {
return false
}
bBuf, err := yaml.Marshal(b)
if err != nil {
return false
}

return bytes.Equal(aBuf, bBuf)
}
157 changes: 157 additions & 0 deletions component/loki/rules/kubernetes/diff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package rules

import (
"fmt"
"testing"

"github.com/prometheus/prometheus/model/rulefmt"
"github.com/stretchr/testify/require"
)

func parseRuleGroups(t *testing.T, buf []byte) []rulefmt.RuleGroup {
t.Helper()

groups, errs := rulefmt.Parse(buf)
require.Empty(t, errs)

return groups.Groups
}

func TestDiffRuleState(t *testing.T) {
ruleGroupsA := parseRuleGroups(t, []byte(`
groups:
- name: rule-group-a
interval: 1m
rules:
- record: rule_a
expr: 1
`))

ruleGroupsAModified := parseRuleGroups(t, []byte(`
groups:
- name: rule-group-a
interval: 1m
rules:
- record: rule_a
expr: 3
`))

managedNamespace := "agent/namespace/name/12345678-1234-1234-1234-123456789012"

type testCase struct {
name string
desired map[string][]rulefmt.RuleGroup
actual map[string][]rulefmt.RuleGroup
expected map[string][]ruleGroupDiff
}

testCases := []testCase{
{
name: "empty sets",
desired: map[string][]rulefmt.RuleGroup{},
actual: map[string][]rulefmt.RuleGroup{},
expected: map[string][]ruleGroupDiff{},
},
{
name: "add rule group",
desired: map[string][]rulefmt.RuleGroup{
managedNamespace: ruleGroupsA,
},
actual: map[string][]rulefmt.RuleGroup{},
expected: map[string][]ruleGroupDiff{
managedNamespace: {
{
Kind: ruleGroupDiffKindAdd,
Desired: ruleGroupsA[0],
},
},
},
},
{
name: "remove rule group",
desired: map[string][]rulefmt.RuleGroup{},
actual: map[string][]rulefmt.RuleGroup{
managedNamespace: ruleGroupsA,
},
expected: map[string][]ruleGroupDiff{
managedNamespace: {
{
Kind: ruleGroupDiffKindRemove,
Actual: ruleGroupsA[0],
},
},
},
},
{
name: "update rule group",
desired: map[string][]rulefmt.RuleGroup{
managedNamespace: ruleGroupsA,
},
actual: map[string][]rulefmt.RuleGroup{
managedNamespace: ruleGroupsAModified,
},
expected: map[string][]ruleGroupDiff{
managedNamespace: {
{
Kind: ruleGroupDiffKindUpdate,
Desired: ruleGroupsA[0],
Actual: ruleGroupsAModified[0],
},
},
},
},
{
name: "unchanged rule groups",
desired: map[string][]rulefmt.RuleGroup{
managedNamespace: ruleGroupsA,
},
actual: map[string][]rulefmt.RuleGroup{
managedNamespace: ruleGroupsA,
},
expected: map[string][]ruleGroupDiff{},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := diffRuleState(tc.desired, tc.actual)
requireEqualRuleDiffs(t, tc.expected, actual)
})
}
}

func requireEqualRuleDiffs(t *testing.T, expected, actual map[string][]ruleGroupDiff) {
require.Equal(t, len(expected), len(actual))

var summarizeDiff = func(diff ruleGroupDiff) string {
switch diff.Kind {
case ruleGroupDiffKindAdd:
return fmt.Sprintf("add: %s", diff.Desired.Name)
case ruleGroupDiffKindRemove:
return fmt.Sprintf("remove: %s", diff.Actual.Name)
case ruleGroupDiffKindUpdate:
return fmt.Sprintf("update: %s", diff.Desired.Name)
}
panic("unreachable")
}

for namespace, expectedDiffs := range expected {
actualDiffs, ok := actual[namespace]
require.True(t, ok)

require.Equal(t, len(expectedDiffs), len(actualDiffs))

for i, expectedDiff := range expectedDiffs {
actualDiff := actualDiffs[i]

if expectedDiff.Kind != actualDiff.Kind ||
!equalRuleGroups(expectedDiff.Desired, actualDiff.Desired) ||
!equalRuleGroups(expectedDiff.Actual, actualDiff.Actual) {

t.Logf("expected diff: %s", summarizeDiff(expectedDiff))
t.Logf("actual diff: %s", summarizeDiff(actualDiff))
t.Fail()
}
}
}
}
Loading

0 comments on commit 078149d

Please sign in to comment.