diff --git a/internal/runtime/import_test.go b/internal/runtime/import_test.go index a9134d8e43..9220a1c600 100644 --- a/internal/runtime/import_test.go +++ b/internal/runtime/import_test.go @@ -83,6 +83,13 @@ func buildTestImportFile(t *testing.T, filename string) testImportFile { // This is a copy of TestImportFile. // It may need to be modified further to make it work with a foreach. +// +// TODO: Why does this test fail? It seems to be running multiple times, +// then fails on the final retry since it's interrupted by a shutdown +// as indicated in the "node exited without error" log message. +// +// TODO: Test a foreach inside a foreach. +// TODO: Test foreach with clustering. func TestForeach(t *testing.T) { directory := "./testdata/foreach" for _, file := range getTestFiles(directory, t) { @@ -132,6 +139,9 @@ func TestImportFile(t *testing.T) { } } +// TODO: Why does this test fail? It seems to be running multiple times, +// then fails on the final retry since it's interrupted by a shutdown +// as indicated in the "node exited without error" log message. func TestImportString(t *testing.T) { directory := "./testdata/import_string" for _, file := range getTestFiles(directory, t) { @@ -386,9 +396,9 @@ func testConfig2(t *testing.T, config string, reloadConfig string, update func() require.Eventually(t, func() bool { export := getExport[testcomponents.SummationExports_2](t, ctrl, "", "testcomponents.summation2.final") // If each iteration of the for loop adds a 1, - // and there are 3 iterations, we expect 3 to be the end result. - //TODO: Make this configurable? - return export.Sum == 3 + // and there are 4 iterations, we expect 4 to be the end result. + //TODO: Make the expected "sum" value configurable? + return export.Sum == 4 }, 3*time.Second, 10*time.Millisecond) // if update != nil { diff --git a/internal/runtime/internal/controller/node_config_foreach.go b/internal/runtime/internal/controller/node_config_foreach.go index f27553832d..1619df4f6c 100644 --- a/internal/runtime/internal/controller/node_config_foreach.go +++ b/internal/runtime/internal/controller/node_config_foreach.go @@ -1,35 +1,41 @@ package controller import ( + "context" + "fmt" + "sync" + "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" ) type ForeachConfigNode struct { - nodeID string - label string - block *ast.BlockStmt // Current Alloy blocks to derive config from + nodeID string + label string + block *ast.BlockStmt // Current Alloy blocks to derive config from + moduleController ModuleController + customComponents []CustomComponent } var _ BlockNode = (*ForeachConfigNode)(nil) +var _ RunnableNode = (*ForeachConfigNode)(nil) // For now the Foreach doesn't have the ability to export arguments. //TODO: We could implement this in the future? type ForeachArguments struct { Collection string `alloy:"collection,attr` - //TODO: Is the "var" argument really needed? - // We could just have a variable with a fixed name referencing the current thing we are iterating over. - Var string `alloy:"var,attr,optional` } func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ForeachConfigNode { nodeID := BlockComponentID(block).String() + globalID := nodeID return &ForeachConfigNode{ - nodeID: nodeID, - label: block.Label, - block: block, + nodeID: nodeID, + label: block.Label, + block: block, + moduleController: globals.NewModuleController(globalID), } } @@ -40,8 +46,77 @@ func (fn *ForeachConfigNode) NodeID() string { return fn.nodeID } func (fn *ForeachConfigNode) Block() *ast.BlockStmt { return fn.block } func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error { + cc, err := fn.moduleController.NewCustomComponent("", func(exports map[string]any) {}) + if err != nil { + return fmt.Errorf("creating custom component: %w", err) + } + + //TODO: Get the "template" block + //TODO: Prefix the custom components with something like "foreach.testForeach.1." + collection, template, err := getArgs(fn.block.Body) + + //TODO: Take into account the actual items in the collection. + // The custom components should be able to use the values from the collection. + loopCount := len(collection) + + for i := 0; i < loopCount; i++ { + args := map[string]any{} + customComponentRegistry := NewCustomComponentRegistry(nil, scope) + if err := cc.LoadBody(template, args, customComponentRegistry); err != nil { + return fmt.Errorf("updating custom component: %w", err) + } + + fn.customComponents = append(fn.customComponents, cc) + } return nil } +func getArgs(body ast.Body) ([]ast.Expr, ast.Body, error) { + var collection []ast.Expr + var template ast.Body + + if len(body) != 2 { + return nil, nil, fmt.Errorf("foreach block must have two children") + } + + for _, stmt := range body { + switch stmt := stmt.(type) { + case *ast.BlockStmt: + if stmt.Label != "template" { + return nil, nil, fmt.Errorf("unknown block") + } + template = stmt.Body + case *ast.AttributeStmt: + if stmt.Name.Name != "collection" { + return nil, nil, fmt.Errorf("unknown attribute") + } + attrExpr, ok := stmt.Value.(*ast.ArrayExpr) + if !ok { + return nil, nil, fmt.Errorf("collection must be an array") + } + collection = attrExpr.Elements + + default: + return nil, nil, fmt.Errorf("unknown argument") + } + } + + return collection, template, nil +} + func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) { } + +func (fn *ForeachConfigNode) Run(ctx context.Context) error { + wg := &sync.WaitGroup{} + for _, cc := range fn.customComponents { + wg.Add(1) + go func(cc CustomComponent) { + defer wg.Done() + //TODO: Get the error + cc.Run(ctx) + }(cc) + } + //TODO: Return all the errors from Run functions which failed + return nil +} diff --git a/internal/runtime/internal/testcomponents/sumation1.go b/internal/runtime/internal/testcomponents/sumation1.go index 1035c4b1c9..a1246b6993 100644 --- a/internal/runtime/internal/testcomponents/sumation1.go +++ b/internal/runtime/internal/testcomponents/sumation1.go @@ -10,7 +10,7 @@ import ( func init() { component.Register(component.Registration{ - Name: "testcomponents.summation_entry", + Name: "testcomponents.summation1", Stability: featuregate.StabilityPublicPreview, Args: SummationConfig_Entry{}, Exports: SummationExports_Entry{}, diff --git a/internal/runtime/internal/testcomponents/sumation2.go b/internal/runtime/internal/testcomponents/sumation2.go index 73e3f984bc..2ca1ee1653 100644 --- a/internal/runtime/internal/testcomponents/sumation2.go +++ b/internal/runtime/internal/testcomponents/sumation2.go @@ -27,11 +27,13 @@ type IntReceiver interface { } type IntReceiverImpl struct { - sum atomic.Int32 + sum atomic.Int32 + updateSumExport func(int) } func (r IntReceiverImpl) ReceiveInt(i int) { - r.sum.Add(int32(i)) + new := r.sum.Add(int32(i)) + r.updateSumExport(int(new)) } type SummationConfig_2 struct { @@ -52,6 +54,15 @@ type Summation_2 struct { // NewSummation creates a new summation component. func NewSummation_2(o component.Options, cfg SummationConfig_2) (*Summation_2, error) { recv := IntReceiverImpl{} + + recv.updateSumExport = func(newSum int) { + o.Logger.Log("msg", "Summation_2: new sum", "sum", newSum) + o.OnStateChange(SummationExports_2{ + Receiver: recv, + Sum: newSum, + }) + } + o.OnStateChange(SummationExports_2{ Receiver: recv, }) @@ -77,5 +88,6 @@ func (t *Summation_2) Run(ctx context.Context) error { // Update implements Component. func (t *Summation_2) Update(args component.Arguments) error { + return nil } diff --git a/internal/runtime/testdata/foreach/foreach_1.txtar b/internal/runtime/testdata/foreach/foreach_1.txtar index 9c372942a6..a8a1c11037 100644 --- a/internal/runtime/testdata/foreach/foreach_1.txtar +++ b/internal/runtime/testdata/foreach/foreach_1.txtar @@ -1,17 +1,18 @@ -- main.alloy -- foreach "testForeach" { collection = [1, 2, 3, 4] - //var = "num" - // Similar to testcomponents.summation, but with a "forward_to" - testcomponents.summation1 "sum" { - //TODO: Use the num variable here - // input = num - input = 1 - forward_to = testcomponents.summation2.final.receiver + template { + // Similar to testcomponents.summation, but with a "forward_to" + testcomponents.summation1 "sum" { + //TODO: Use the num variable here + // input = num + input = 1 + forward_to = [testcomponents.summation2.final.receiver] + } } } // Similar to testcomponents.summation, but with a "receiver" export testcomponents.summation2 "final" { -} \ No newline at end of file +}