Skip to content

Commit

Permalink
rework foreach txtar tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Jan 9, 2025
1 parent ca1771c commit 175062a
Show file tree
Hide file tree
Showing 15 changed files with 454 additions and 238 deletions.
138 changes: 94 additions & 44 deletions internal/runtime/foreach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,76 @@ package runtime_test

import (
"context"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/runtime"
alloy_runtime "github.com/grafana/alloy/internal/runtime"
"github.com/stretchr/testify/require"
"k8s.io/component-base/metrics/testutil"
"golang.org/x/tools/txtar"
)

// TODO: Test a foreach inside a foreach.
// TODO: Test foreach with clustering.
func TestForeach(t *testing.T) {
directory := "./testdata/foreach"
for _, file := range getTestFiles(directory, t) {
reg := prometheus.NewRegistry()
tc := buildTestImportFile(t, filepath.Join(directory, file.Name()))
tc := buildTestForEach(t, filepath.Join(directory, file.Name()))
t.Run(tc.description, func(t *testing.T) {
testConfigForEach(t, reg, tc.main, tc.reloadConfig, nil)
if tc.module != "" {
defer os.Remove("module.alloy")
require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664))
}
if tc.update != nil {
testConfigForEach(t, tc.main, tc.reloadConfig, func() {
require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664))
})
} else {
testConfigForEach(t, tc.main, tc.reloadConfig, nil)
}
})
}
}

func testConfigForEach(t *testing.T, reg *prometheus.Registry, config string, reloadConfig string, update func()) {
type testForEachFile struct {
description string // description at the top of the txtar file
main string // root config that the controller should load
module string // module imported by the root config
reloadConfig string // root config that the controller should apply on reload
update *updateFile // update can be used to update the content of a file at runtime
}

func buildTestForEach(t *testing.T, filename string) testForEachFile {
archive, err := txtar.ParseFile(filename)
require.NoError(t, err)
var tc testForEachFile
tc.description = string(archive.Comment)
for _, alloyConfig := range archive.Files {
switch alloyConfig.Name {
case mainFile:
tc.main = string(alloyConfig.Data)
case "module.alloy":
tc.module = string(alloyConfig.Data)
case "update/module.alloy":
require.Nil(t, tc.update)
tc.update = &updateFile{
name: "module.alloy",
updateConfig: string(alloyConfig.Data),
}
case "reload_config.alloy":
tc.reloadConfig = string(alloyConfig.Data)
}
}
return tc
}

func testConfigForEach(t *testing.T, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config, reg)
ctrl, f := setup(t, config)

err := ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
Expand All @@ -47,41 +89,49 @@ func testConfigForEach(t *testing.T, reg *prometheus.Registry, config string, re
ctrl.Run(ctx)
}()

// Check for initial condition
require.EventuallyWithT(t, func(c *assert.CollectT) {
expectedMetrics := `
# HELP testcomponents_summation2 Summation of all integers received
# TYPE testcomponents_summation2 counter
testcomponents_summation2{component_id="testcomponents.summation2.final",component_path="/"} 2
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "testcomponents_summation2_total"); err != nil {
c.Errorf("mismatch metrics: %v", err)
}
require.Eventually(t, func() bool {
sum := getDebugInfo[int](t, ctrl, "", "testcomponents.summation_receiver.sum")
return sum >= 10
}, 3*time.Second, 10*time.Millisecond)

// if update != nil {
// update()

// // Export should be -10 after update
// require.Eventually(t, func() bool {
// export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
// return export.LastAdded <= -10
// }, 3*time.Second, 10*time.Millisecond)
// }

// if reloadConfig != "" {
// f, err = alloy_runtime.ParseSource(t.Name(), []byte(reloadConfig))
// require.NoError(t, err)
// require.NotNil(t, f)

// // Reload the controller with the new config.
// err = ctrl.LoadSource(f, nil)
// require.NoError(t, err)

// // Export should be -10 after update
// require.Eventually(t, func() bool {
// export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
// return export.LastAdded <= -10
// }, 3*time.Second, 10*time.Millisecond)
// }
if update != nil {
update()

// Sum should be 30 after update
require.Eventually(t, func() bool {
sum := getDebugInfo[int](t, ctrl, "", "testcomponents.summation_receiver.sum")
return sum >= 30
}, 3*time.Second, 10*time.Millisecond)
}

if reloadConfig != "" {
f, err = alloy_runtime.ParseSource(t.Name(), []byte(reloadConfig))
require.NoError(t, err)
require.NotNil(t, f)

// Reload the controller with the new config.
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)

// Sum should be 30 after update
require.Eventually(t, func() bool {
sum := getDebugInfo[int](t, ctrl, "", "testcomponents.summation_receiver.sum")
return sum >= 30
}, 3*time.Second, 10*time.Millisecond)
}
}

func getDebugInfo[T any](t *testing.T, ctrl *runtime.Runtime, moduleId string, nodeId string) T {
t.Helper()
info, err := ctrl.GetComponent(component.ID{
ModuleID: moduleId,
LocalID: nodeId,
}, component.InfoOptions{
GetHealth: true,
GetArguments: true,
GetExports: true,
GetDebugInfo: true,
})
require.NoError(t, err)
return info.DebugInfo.(T)
}
9 changes: 4 additions & 5 deletions internal/runtime/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"golang.org/x/tools/txtar"

Expand Down Expand Up @@ -302,7 +301,7 @@ func TestImportError(t *testing.T) {

func testConfig(t *testing.T, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config, nil)
ctrl, f := setup(t, config)

err := ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
Expand Down Expand Up @@ -355,7 +354,7 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func())

func testConfigError(t *testing.T, config string, expectedError string) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config, nil)
ctrl, f := setup(t, config)
err := ctrl.LoadSource(f, nil, "")
require.ErrorContains(t, err, expectedError)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -372,14 +371,14 @@ func testConfigError(t *testing.T, config string, expectedError string) {
}()
}

func setup(t *testing.T, config string, reg prometheus.Registerer) (*alloy_runtime.Runtime, *alloy_runtime.Source) {
func setup(t *testing.T, config string) (*alloy_runtime.Runtime, *alloy_runtime.Source) {
s, err := logging.New(os.Stderr, logging.DefaultOptions)
require.NoError(t, err)
ctrl := alloy_runtime.New(alloy_runtime.Options{
Logger: s,
DataPath: t.TempDir(),
MinStability: featuregate.StabilityPublicPreview,
Reg: reg,
Reg: nil,
Services: []service.Service{},
})
f, err := alloy_runtime.ParseSource(t.Name(), []byte(config))
Expand Down
4 changes: 4 additions & 0 deletions internal/runtime/internal/controller/node_config_foreach.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"fmt"
"hash/fnv"
"path"
"sync"

"github.com/grafana/alloy/internal/runner"
Expand Down Expand Up @@ -41,6 +42,9 @@ type ForeachArguments struct {
func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ForeachConfigNode {
nodeID := BlockComponentID(block).String()
globalID := nodeID
if globals.ControllerID != "" {
globalID = path.Join(globals.ControllerID, nodeID)
}

return &ForeachConfigNode{
nodeID: nodeID,
Expand Down
15 changes: 9 additions & 6 deletions internal/runtime/internal/testcomponents/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func init() {
type CountConfig struct {
Frequency time.Duration `alloy:"frequency,attr"`
Max int `alloy:"max,attr"`
ForwardTo []IntReceiver `alloy:"forward_to,attr,optional"`
}

type CountExports struct {
Expand Down Expand Up @@ -63,18 +64,20 @@ func (t *Count) Run(ctx context.Context) error {
return nil
case <-time.After(t.getNextCount()):
t.cfgMut.Lock()
maxCount := t.cfg.Max
t.cfgMut.Unlock()

currentCount := t.count.Load()
if maxCount == 0 || currentCount < int32(maxCount) {
if t.cfg.Max == 0 || currentCount < int32(t.cfg.Max) {
if t.count.CompareAndSwap(currentCount, currentCount+1) {
level.Info(t.log).Log("msg", "incremented count", "count", currentCount+1)
t.opts.OnStateChange(CountExports{Count: int(currentCount + 1)})
newCount := int(currentCount + 1)
level.Info(t.log).Log("msg", "incremented count", "count", newCount)
t.opts.OnStateChange(CountExports{Count: newCount})
for _, r := range t.cfg.ForwardTo {
r.ReceiveInt(newCount)
}
} else {
level.Info(t.log).Log("msg", "failed to increment count", "count", currentCount)
}
}
t.cfgMut.Unlock()
}
}
}
Expand Down
91 changes: 91 additions & 0 deletions internal/runtime/internal/testcomponents/pulse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package testcomponents

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
)

func init() {
component.Register(component.Registration{
Name: "testcomponents.pulse",
Stability: featuregate.StabilityPublicPreview,
Args: PulseConfig{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return NewPulse(opts, args.(PulseConfig))
},
})
}

type PulseConfig struct {
Frequency time.Duration `alloy:"frequency,attr"`
Max int `alloy:"max,attr"`
ForwardTo []IntReceiver `alloy:"forward_to,attr,optional"`
}

type Pulse struct {
opts component.Options
log log.Logger

cfgMut sync.Mutex
cfg PulseConfig
count int
}

func NewPulse(o component.Options, cfg PulseConfig) (*Pulse, error) {
t := &Pulse{opts: o, log: o.Logger}
if err := t.Update(cfg); err != nil {
return nil, err
}
return t, nil
}

var (
_ component.Component = (*Pulse)(nil)
)

func (p *Pulse) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(p.getNextPulse()):
p.cfgMut.Lock()
if p.cfg.Max == 0 || p.count < p.cfg.Max {
for _, r := range p.cfg.ForwardTo {
r.ReceiveInt(1)
}
p.count++
}
p.cfgMut.Unlock()
}
}
}

func (t *Pulse) getNextPulse() time.Duration {
t.cfgMut.Lock()
defer t.cfgMut.Unlock()
return t.cfg.Frequency
}

// Update implements Component.
func (t *Pulse) Update(args component.Arguments) error {
t.cfgMut.Lock()
defer t.cfgMut.Unlock()

cfg := args.(PulseConfig)
if cfg.Frequency == 0 {
return fmt.Errorf("frequency must not be 0")
}

level.Info(t.log).Log("msg", "setting count frequency", "freq", cfg.Frequency)
t.cfg = cfg
return nil
}
Loading

0 comments on commit 175062a

Please sign in to comment.