diff --git a/internal/runtime/internal/controller/node_config.go b/internal/runtime/internal/controller/node_config.go index 5338237caf..ff8c6647db 100644 --- a/internal/runtime/internal/controller/node_config.go +++ b/internal/runtime/internal/controller/node_config.go @@ -13,6 +13,7 @@ const ( exportBlockID = "export" loggingBlockID = "logging" tracingBlockID = "tracing" + foreachID = "foreach" ) // NewConfigNode creates a new ConfigNode from an initial ast.BlockStmt. @@ -29,7 +30,7 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d return NewTracingConfigNode(block, globals), nil case importsource.BlockImportFile, importsource.BlockImportString, importsource.BlockImportHTTP, importsource.BlockImportGit: return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil - case importsource.BlockForeach: + case foreachID: return NewForeachConfigNode(block, globals), nil default: var diags diag.Diagnostics diff --git a/internal/runtime/internal/controller/node_config_foreach.go b/internal/runtime/internal/controller/node_config_foreach.go index 1619df4f6c..3bba73a5dd 100644 --- a/internal/runtime/internal/controller/node_config_foreach.go +++ b/internal/runtime/internal/controller/node_config_foreach.go @@ -3,8 +3,10 @@ package controller import ( "context" "fmt" + "hash/fnv" "sync" + "github.com/grafana/alloy/internal/runner" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" ) @@ -12,9 +14,16 @@ import ( type ForeachConfigNode struct { nodeID string label string - block *ast.BlockStmt // Current Alloy blocks to derive config from moduleController ModuleController - customComponents []CustomComponent + + customComponents map[string]CustomComponent + + forEachChildrenUpdateChan chan struct{} // used to trigger an update of the running children + forEachChildrenRunning bool + + mut sync.RWMutex + block *ast.BlockStmt + eval *vm.Evaluator } var _ BlockNode = (*ForeachConfigNode)(nil) @@ -24,7 +33,7 @@ var _ RunnableNode = (*ForeachConfigNode)(nil) //TODO: We could implement this in the future? type ForeachArguments struct { - Collection string `alloy:"collection,attr` + Collection []string `alloy:"collection,attr"` } func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ForeachConfigNode { @@ -32,10 +41,13 @@ func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *Forea globalID := nodeID return &ForeachConfigNode{ - nodeID: nodeID, - label: block.Label, - block: block, - moduleController: globals.NewModuleController(globalID), + nodeID: nodeID, + label: block.Label, + block: block, + eval: vm.New(block.Body), + moduleController: globals.NewModuleController(globalID), + forEachChildrenUpdateChan: make(chan struct{}, 1), + customComponents: make(map[string]CustomComponent, 0), } } @@ -46,31 +58,129 @@ func (fn *ForeachConfigNode) NodeID() string { return fn.nodeID } func (fn *ForeachConfigNode) Block() *ast.BlockStmt { return fn.block } func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error { - cc, err := fn.moduleController.NewCustomComponent("", func(exports map[string]any) {}) - if err != nil { - return fmt.Errorf("creating custom component: %w", err) - } + fn.mut.Lock() + defer fn.mut.Unlock() //TODO: Get the "template" block //TODO: Prefix the custom components with something like "foreach.testForeach.1." + //TODO: find a way to evaluate the block? collection, template, err := getArgs(fn.block.Body) + if err != nil { + return fmt.Errorf("parsing foreach block: %w", err) + } //TODO: Take into account the actual items in the collection. // The custom components should be able to use the values from the collection. loopCount := len(collection) + // Loop through the items to create the custom components. + // On re-evaluation new components are added and existing ones are updated. + newCustomComponentIds := make(map[string]bool, loopCount) + tmp := []string{"aaa", "bbb", "ccc", "ddd"} for i := 0; i < loopCount; i++ { + customComponentID := tmp[i] + cc, err := fn.getOrCreateCustomComponent(customComponentID) + if err != nil { + return err + } + args := map[string]any{} + // TODO: use the registry from the loader to access the modules customComponentRegistry := NewCustomComponentRegistry(nil, scope) if err := cc.LoadBody(template, args, customComponentRegistry); err != nil { - return fmt.Errorf("updating custom component: %w", err) + return fmt.Errorf("updating custom component in foreach: %w", err) } + newCustomComponentIds[customComponentID] = true + } - fn.customComponents = append(fn.customComponents, cc) + // Delete the custom components that are no longer in the foreach. + // The runner pkg will stop them properly. + for id := range fn.customComponents { + if _, exist := newCustomComponentIds[id]; !exist { + delete(fn.customComponents, id) + } + } + + // trigger to stop previous children from running and to start running the new ones. + if fn.forEachChildrenRunning { + select { + case fn.forEachChildrenUpdateChan <- struct{}{}: // queued trigger + default: // trigger already queued; no-op + } } return nil } +func (fn *ForeachConfigNode) getOrCreateCustomComponent(customComponentID string) (CustomComponent, error) { + cc, exists := fn.customComponents[customComponentID] + if exists { + return cc, nil + } + + newCC, err := fn.moduleController.NewCustomComponent(customComponentID, func(exports map[string]any) {}) + if err != nil { + return nil, fmt.Errorf("creating custom component: %w", err) + } + fn.customComponents[customComponentID] = newCC + return newCC, nil +} + +func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) { + fn.mut.Lock() + defer fn.mut.Unlock() + fn.block = b +} + +func (fn *ForeachConfigNode) Run(ctx context.Context) error { + newCtx, cancel := context.WithCancel(ctx) + defer cancel() + + runner := runner.New(func(forEachChild *forEachChild) runner.Worker { + return &forEachChildRunner{ + child: forEachChild, + } + }) + defer runner.Stop() + + updateTasks := func() error { + fn.mut.Lock() + defer fn.mut.Unlock() + fn.forEachChildrenRunning = true + var tasks []*forEachChild + for customComponentID, customComponent := range fn.customComponents { + tasks = append(tasks, &forEachChild{ + id: customComponentID, + cc: customComponent, + }) + } + + return runner.ApplyTasks(newCtx, tasks) + } + + err := updateTasks() + if err != nil { + // TODO: log error + } + + errChan := make(chan error, 1) + err = fn.run(errChan, updateTasks) + return err +} + +func (fn *ForeachConfigNode) run(errChan chan error, updateTasks func() error) error { + for { + select { + case <-fn.forEachChildrenUpdateChan: + err := updateTasks() + if err != nil { + // TODO: log error + } + case err := <-errChan: + return err + } + } +} + func getArgs(body ast.Body) ([]ast.Expr, ast.Body, error) { var collection []ast.Expr var template ast.Body @@ -82,7 +192,7 @@ func getArgs(body ast.Body) ([]ast.Expr, ast.Body, error) { for _, stmt := range body { switch stmt := stmt.(type) { case *ast.BlockStmt: - if stmt.Label != "template" { + if stmt.GetBlockName() != "template" { return nil, nil, fmt.Errorf("unknown block") } template = stmt.Body @@ -104,19 +214,28 @@ func getArgs(body ast.Body) ([]ast.Expr, ast.Body, error) { return collection, template, nil } -func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) { +type forEachChildRunner struct { + child *forEachChild } -func (fn *ForeachConfigNode) Run(ctx context.Context) error { - wg := &sync.WaitGroup{} - for _, cc := range fn.customComponents { - wg.Add(1) - go func(cc CustomComponent) { - defer wg.Done() - //TODO: Get the error - cc.Run(ctx) - }(cc) +type forEachChild struct { + cc CustomComponent + id string +} + +func (fr *forEachChildRunner) Run(ctx context.Context) { + err := fr.child.cc.Run(ctx) + if err != nil { + // TODO: log and update health } - //TODO: Return all the errors from Run functions which failed - return nil +} + +func (fi *forEachChild) Hash() uint64 { + fnvHash := fnv.New64a() + fnvHash.Write([]byte(fi.id)) + return fnvHash.Sum64() +} + +func (fi *forEachChild) Equals(other runner.Task) bool { + return fi.id == other.(*forEachChild).id } diff --git a/internal/runtime/internal/importsource/import_source.go b/internal/runtime/internal/importsource/import_source.go index e5b8d7ecd6..ce3a369b98 100644 --- a/internal/runtime/internal/importsource/import_source.go +++ b/internal/runtime/internal/importsource/import_source.go @@ -15,7 +15,6 @@ const ( String Git HTTP - Foreach ) const ( @@ -23,7 +22,6 @@ const ( BlockImportString = "import.string" BlockImportHTTP = "import.http" BlockImportGit = "import.git" - BlockForeach = "foreach" ) const ModulePath = "module_path" @@ -69,8 +67,6 @@ func GetSourceType(fullName string) SourceType { return HTTP case BlockImportGit: return Git - case BlockForeach: - return Foreach } panic(fmt.Errorf("name does not map to a known source type: %v", fullName)) }