Skip to content

Commit

Permalink
Fixes to summation1 and summation2
Browse files Browse the repository at this point in the history
* summation1 only sends during run()
* summation2 tracks the sum via metrics instead of an export
  • Loading branch information
ptodev committed Dec 17, 2024
1 parent d1591b1 commit 25a6902
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 52 deletions.
33 changes: 20 additions & 13 deletions internal/runtime/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/tools/txtar"

Expand Down Expand Up @@ -93,9 +96,10 @@ func buildTestImportFile(t *testing.T, filename string) testImportFile {
func TestForeach(t *testing.T) {
directory := "./testdata/foreach"
for _, file := range getTestFiles(directory, t) {
reg := prometheus.NewRegistry()
tc := buildTestImportFile(t, filepath.Join(directory, file.Name()))
t.Run(tc.description, func(t *testing.T) {
testConfig2(t, tc.main, tc.reloadConfig, nil)
testConfig2(t, reg, tc.main, tc.reloadConfig, nil)
})
}
}
Expand Down Expand Up @@ -320,7 +324,7 @@ func TestImportError(t *testing.T) {

func testConfig(t *testing.T, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
ctrl, f := setup(t, config, nil)

err := ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
Expand Down Expand Up @@ -372,9 +376,9 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func())
}

// This function is a copy of testConfig above.
func testConfig2(t *testing.T, config string, reloadConfig string, update func()) {
func testConfig2(t *testing.T, reg *prometheus.Registry, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
ctrl, f := setup(t, config, reg)

err := ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
Expand All @@ -393,12 +397,15 @@ func testConfig2(t *testing.T, config string, reloadConfig string, update func()
}()

// Check for initial condition
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports_2](t, ctrl, "", "testcomponents.summation2.final")
// If each iteration of the for loop adds a 1,
// and there are 4 iterations, we expect 4 to be the end result.
//TODO: Make the expected "sum" value configurable?
return export.Sum == 4
require.EventuallyWithT(t, func(c *assert.CollectT) {
expectedMetrics := `
# HELP testcomponents_summation2 Summation of all integers received
# TYPE testcomponents_summation2 counter
testcomponents_summation2{component_id="testcomponents.summation2.final",component_path="/"} 1
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "testcomponents_summation2"); err != nil {
c.Errorf("mismatch metrics: %v", err)
}
}, 3*time.Second, 10*time.Millisecond)

// if update != nil {
Expand Down Expand Up @@ -430,7 +437,7 @@ func testConfig2(t *testing.T, config string, reloadConfig string, update func()

func testConfigError(t *testing.T, config string, expectedError string) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
ctrl, f := setup(t, config, nil)
err := ctrl.LoadSource(f, nil, "")
require.ErrorContains(t, err, expectedError)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -447,14 +454,14 @@ func testConfigError(t *testing.T, config string, expectedError string) {
}()
}

func setup(t *testing.T, config string) (*alloy_runtime.Runtime, *alloy_runtime.Source) {
func setup(t *testing.T, config string, reg prometheus.Registerer) (*alloy_runtime.Runtime, *alloy_runtime.Source) {
s, err := logging.New(os.Stderr, logging.DefaultOptions)
require.NoError(t, err)
ctrl := alloy_runtime.New(alloy_runtime.Options{
Logger: s,
DataPath: t.TempDir(),
MinStability: featuregate.StabilityPublicPreview,
Reg: nil,
Reg: reg,
Services: []service.Service{},
})
f, err := alloy_runtime.ParseSource(t.Name(), []byte(config))
Expand Down
25 changes: 12 additions & 13 deletions internal/runtime/internal/testcomponents/sumation1.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func init() {

// Accepts a single integer input and forwards it to all the components listed in forward_to.
type SummationConfig_Entry struct {
Input int `alloy:"input,attr"`
//TODO: What should the type be?
Input int `alloy:"input,attr"`
ForwardTo []IntReceiver `alloy:"forward_to,attr"`
}

Expand All @@ -34,15 +33,16 @@ type SummationExports_Entry struct {
type Summation_Entry struct {
opts component.Options
log log.Logger
cfg SummationConfig_Entry
}

// NewSummation creates a new summation component.
func NewSummation_Entry(o component.Options, cfg SummationConfig_Entry) (*Summation_Entry, error) {
t := &Summation_Entry{opts: o, log: o.Logger}
if err := t.Update(cfg); err != nil {
return nil, err
}
return t, nil
return &Summation_Entry{
opts: o,
log: o.Logger,
cfg: cfg,
}, nil
}

var (
Expand All @@ -51,17 +51,16 @@ var (

// Run implements Component.
func (t *Summation_Entry) Run(ctx context.Context) error {
for _, r := range t.cfg.ForwardTo {
r.ReceiveInt(t.cfg.Input)
}

<-ctx.Done()
return nil
}

// Update implements Component.
func (t *Summation_Entry) Update(args component.Arguments) error {
c := args.(SummationConfig_Entry)

for _, r := range c.ForwardTo {
r.ReceiveInt(c.Input)
}

// TODO: Implement this?
return nil
}
56 changes: 31 additions & 25 deletions internal/runtime/internal/testcomponents/sumation2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
"go.uber.org/atomic"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
Expand All @@ -17,7 +17,7 @@ func init() {
Exports: SummationExports_2{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return NewSummation_2(opts, args.(SummationConfig_2))
return NewSummation_2(opts)
},
})
}
Expand All @@ -27,52 +27,54 @@ type IntReceiver interface {
}

type IntReceiverImpl struct {
sum atomic.Int32
updateSumExport func(int)
incrementSum func(int)
}

func (r IntReceiverImpl) ReceiveInt(i int) {
new := r.sum.Add(int32(i))
r.updateSumExport(int(new))
r.incrementSum(i)
}

type SummationConfig_2 struct {
}

type SummationExports_2 struct {
Receiver IntReceiver `alloy:"receiver,attr"`
Sum int `alloy:"sum,attr"`
LastAdded int `alloy:"last_added,attr"`
Receiver IntReceiver `alloy:"receiver,attr"`
}

type Summation_2 struct {
opts component.Options
log log.Logger
opts component.Options
log log.Logger

reg prometheus.Registerer
counter prometheus.Counter
receiver IntReceiver
}

// NewSummation creates a new summation component.
func NewSummation_2(o component.Options, cfg SummationConfig_2) (*Summation_2, error) {
recv := IntReceiverImpl{}

recv.updateSumExport = func(newSum int) {
o.Logger.Log("msg", "Summation_2: new sum", "sum", newSum)
o.OnStateChange(SummationExports_2{
Receiver: recv,
Sum: newSum,
})
}

o.OnStateChange(SummationExports_2{
Receiver: recv,
func NewSummation_2(o component.Options) (*Summation_2, error) {
counter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "testcomponents_summation2",
Help: "Summation of all integers received",
})

recv := IntReceiverImpl{
incrementSum: func(i int) {
counter.Add(float64(i))
},
}

t := &Summation_2{
opts: o,
log: o.Logger,
receiver: recv,
reg: o.Registerer,
counter: counter,
}

o.OnStateChange(SummationExports_2{
Receiver: t.receiver,
})

return t, nil
}

Expand All @@ -82,12 +84,16 @@ var (

// Run implements Component.
func (t *Summation_2) Run(ctx context.Context) error {
if err := t.reg.Register(t.counter); err != nil {
return err
}
defer t.reg.Unregister(t.counter)

<-ctx.Done()
return nil
}

// Update implements Component.
func (t *Summation_2) Update(args component.Arguments) error {

return nil
}
2 changes: 1 addition & 1 deletion internal/runtime/testdata/foreach/foreach_1.txtar
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- main.alloy --
foreach "testForeach" {
collection = [1, 2, 3, 4]
collection = [1]

template {
// Similar to testcomponents.summation, but with a "forward_to"
Expand Down

0 comments on commit 25a6902

Please sign in to comment.