From 25a6902393bbc399355627949fd3fec207c65437 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 17 Dec 2024 10:16:53 +0200 Subject: [PATCH] Fixes to summation1 and summation2 * summation1 only sends during run() * summation2 tracks the sum via metrics instead of an export --- internal/runtime/import_test.go | 33 ++++++----- .../internal/testcomponents/sumation1.go | 25 ++++----- .../internal/testcomponents/sumation2.go | 56 ++++++++++--------- .../runtime/testdata/foreach/foreach_1.txtar | 2 +- 4 files changed, 64 insertions(+), 52 deletions(-) diff --git a/internal/runtime/import_test.go b/internal/runtime/import_test.go index 9220a1c600..c708ef5675 100644 --- a/internal/runtime/import_test.go +++ b/internal/runtime/import_test.go @@ -16,6 +16,9 @@ import ( "github.com/grafana/alloy/internal/runtime/logging" "github.com/grafana/alloy/internal/service" "github.com/grafana/alloy/internal/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/tools/txtar" @@ -93,9 +96,10 @@ func buildTestImportFile(t *testing.T, filename string) testImportFile { func TestForeach(t *testing.T) { directory := "./testdata/foreach" for _, file := range getTestFiles(directory, t) { + reg := prometheus.NewRegistry() tc := buildTestImportFile(t, filepath.Join(directory, file.Name())) t.Run(tc.description, func(t *testing.T) { - testConfig2(t, tc.main, tc.reloadConfig, nil) + testConfig2(t, reg, tc.main, tc.reloadConfig, nil) }) } } @@ -320,7 +324,7 @@ func TestImportError(t *testing.T) { func testConfig(t *testing.T, config string, reloadConfig string, update func()) { defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, config) + ctrl, f := setup(t, config, nil) err := ctrl.LoadSource(f, nil, "") require.NoError(t, err) @@ -372,9 +376,9 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func()) } // This function is a copy of testConfig above. -func testConfig2(t *testing.T, config string, reloadConfig string, update func()) { +func testConfig2(t *testing.T, reg *prometheus.Registry, config string, reloadConfig string, update func()) { defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, config) + ctrl, f := setup(t, config, reg) err := ctrl.LoadSource(f, nil, "") require.NoError(t, err) @@ -393,12 +397,15 @@ func testConfig2(t *testing.T, config string, reloadConfig string, update func() }() // Check for initial condition - require.Eventually(t, func() bool { - export := getExport[testcomponents.SummationExports_2](t, ctrl, "", "testcomponents.summation2.final") - // If each iteration of the for loop adds a 1, - // and there are 4 iterations, we expect 4 to be the end result. - //TODO: Make the expected "sum" value configurable? - return export.Sum == 4 + require.EventuallyWithT(t, func(c *assert.CollectT) { + expectedMetrics := ` +# HELP testcomponents_summation2 Summation of all integers received +# TYPE testcomponents_summation2 counter +testcomponents_summation2{component_id="testcomponents.summation2.final",component_path="/"} 1 +` + if err := testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "testcomponents_summation2"); err != nil { + c.Errorf("mismatch metrics: %v", err) + } }, 3*time.Second, 10*time.Millisecond) // if update != nil { @@ -430,7 +437,7 @@ func testConfig2(t *testing.T, config string, reloadConfig string, update func() func testConfigError(t *testing.T, config string, expectedError string) { defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, config) + ctrl, f := setup(t, config, nil) err := ctrl.LoadSource(f, nil, "") require.ErrorContains(t, err, expectedError) ctx, cancel := context.WithCancel(context.Background()) @@ -447,14 +454,14 @@ func testConfigError(t *testing.T, config string, expectedError string) { }() } -func setup(t *testing.T, config string) (*alloy_runtime.Runtime, *alloy_runtime.Source) { +func setup(t *testing.T, config string, reg prometheus.Registerer) (*alloy_runtime.Runtime, *alloy_runtime.Source) { s, err := logging.New(os.Stderr, logging.DefaultOptions) require.NoError(t, err) ctrl := alloy_runtime.New(alloy_runtime.Options{ Logger: s, DataPath: t.TempDir(), MinStability: featuregate.StabilityPublicPreview, - Reg: nil, + Reg: reg, Services: []service.Service{}, }) f, err := alloy_runtime.ParseSource(t.Name(), []byte(config)) diff --git a/internal/runtime/internal/testcomponents/sumation1.go b/internal/runtime/internal/testcomponents/sumation1.go index a1246b6993..63af56370e 100644 --- a/internal/runtime/internal/testcomponents/sumation1.go +++ b/internal/runtime/internal/testcomponents/sumation1.go @@ -23,8 +23,7 @@ func init() { // Accepts a single integer input and forwards it to all the components listed in forward_to. type SummationConfig_Entry struct { - Input int `alloy:"input,attr"` - //TODO: What should the type be? + Input int `alloy:"input,attr"` ForwardTo []IntReceiver `alloy:"forward_to,attr"` } @@ -34,15 +33,16 @@ type SummationExports_Entry struct { type Summation_Entry struct { opts component.Options log log.Logger + cfg SummationConfig_Entry } // NewSummation creates a new summation component. func NewSummation_Entry(o component.Options, cfg SummationConfig_Entry) (*Summation_Entry, error) { - t := &Summation_Entry{opts: o, log: o.Logger} - if err := t.Update(cfg); err != nil { - return nil, err - } - return t, nil + return &Summation_Entry{ + opts: o, + log: o.Logger, + cfg: cfg, + }, nil } var ( @@ -51,17 +51,16 @@ var ( // Run implements Component. func (t *Summation_Entry) Run(ctx context.Context) error { + for _, r := range t.cfg.ForwardTo { + r.ReceiveInt(t.cfg.Input) + } + <-ctx.Done() return nil } // Update implements Component. func (t *Summation_Entry) Update(args component.Arguments) error { - c := args.(SummationConfig_Entry) - - for _, r := range c.ForwardTo { - r.ReceiveInt(c.Input) - } - + // TODO: Implement this? return nil } diff --git a/internal/runtime/internal/testcomponents/sumation2.go b/internal/runtime/internal/testcomponents/sumation2.go index 2ca1ee1653..2c6e8ad2eb 100644 --- a/internal/runtime/internal/testcomponents/sumation2.go +++ b/internal/runtime/internal/testcomponents/sumation2.go @@ -6,7 +6,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/featuregate" - "go.uber.org/atomic" + "github.com/prometheus/client_golang/prometheus" ) func init() { @@ -17,7 +17,7 @@ func init() { Exports: SummationExports_2{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { - return NewSummation_2(opts, args.(SummationConfig_2)) + return NewSummation_2(opts) }, }) } @@ -27,52 +27,54 @@ type IntReceiver interface { } type IntReceiverImpl struct { - sum atomic.Int32 - updateSumExport func(int) + incrementSum func(int) } func (r IntReceiverImpl) ReceiveInt(i int) { - new := r.sum.Add(int32(i)) - r.updateSumExport(int(new)) + r.incrementSum(i) } type SummationConfig_2 struct { } type SummationExports_2 struct { - Receiver IntReceiver `alloy:"receiver,attr"` - Sum int `alloy:"sum,attr"` - LastAdded int `alloy:"last_added,attr"` + Receiver IntReceiver `alloy:"receiver,attr"` } type Summation_2 struct { - opts component.Options - log log.Logger + opts component.Options + log log.Logger + + reg prometheus.Registerer + counter prometheus.Counter receiver IntReceiver } // NewSummation creates a new summation component. -func NewSummation_2(o component.Options, cfg SummationConfig_2) (*Summation_2, error) { - recv := IntReceiverImpl{} - - recv.updateSumExport = func(newSum int) { - o.Logger.Log("msg", "Summation_2: new sum", "sum", newSum) - o.OnStateChange(SummationExports_2{ - Receiver: recv, - Sum: newSum, - }) - } - - o.OnStateChange(SummationExports_2{ - Receiver: recv, +func NewSummation_2(o component.Options) (*Summation_2, error) { + counter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "testcomponents_summation2", + Help: "Summation of all integers received", }) + recv := IntReceiverImpl{ + incrementSum: func(i int) { + counter.Add(float64(i)) + }, + } + t := &Summation_2{ opts: o, log: o.Logger, receiver: recv, + reg: o.Registerer, + counter: counter, } + o.OnStateChange(SummationExports_2{ + Receiver: t.receiver, + }) + return t, nil } @@ -82,12 +84,16 @@ var ( // Run implements Component. func (t *Summation_2) Run(ctx context.Context) error { + if err := t.reg.Register(t.counter); err != nil { + return err + } + defer t.reg.Unregister(t.counter) + <-ctx.Done() return nil } // Update implements Component. func (t *Summation_2) Update(args component.Arguments) error { - return nil } diff --git a/internal/runtime/testdata/foreach/foreach_1.txtar b/internal/runtime/testdata/foreach/foreach_1.txtar index a8a1c11037..365d9df883 100644 --- a/internal/runtime/testdata/foreach/foreach_1.txtar +++ b/internal/runtime/testdata/foreach/foreach_1.txtar @@ -1,6 +1,6 @@ -- main.alloy -- foreach "testForeach" { - collection = [1, 2, 3, 4] + collection = [1] template { // Similar to testcomponents.summation, but with a "forward_to"