Skip to content

Commit

Permalink
fix foreach run
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Dec 17, 2024
1 parent 25a6902 commit 1984a4c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
5 changes: 5 additions & 0 deletions internal/runtime/alloy.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (f *Runtime) Run(ctx context.Context) {
components = f.loader.Components()
services = f.loader.Services()
imports = f.loader.Imports()
forEachs = f.loader.ForEachs()

runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports))
)
Expand All @@ -276,6 +277,10 @@ func (f *Runtime) Run(ctx context.Context) {
runnables = append(runnables, i)
}

for _, fe := range forEachs {
runnables = append(runnables, fe)
}

// Only the root controller should run services, since modules share the
// same service instance as the root.
if !f.opts.IsModule {
Expand Down
9 changes: 9 additions & 0 deletions internal/runtime/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Loader struct {
componentNodes []ComponentNode
declareNodes map[string]*DeclareNode
importConfigNodes map[string]*ImportConfigNode
forEachNodes map[string]*ForeachConfigNode
serviceNodes []*ServiceNode
cache *valueCache
blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing
Expand Down Expand Up @@ -547,6 +548,7 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con
}

l.importConfigNodes = nodeMap.importMap
l.forEachNodes = nodeMap.foreachMap

return diags
}
Expand Down Expand Up @@ -681,6 +683,13 @@ func (l *Loader) Imports() map[string]*ImportConfigNode {
return l.importConfigNodes
}

// ForEachs returns the current set of for_each nodes.
func (l *Loader) ForEachs() map[string]*ForeachConfigNode {
l.mut.RLock()
defer l.mut.RUnlock()
return l.forEachNodes
}

// Graph returns a copy of the DAG managed by the Loader.
func (l *Loader) Graph() *dag.Graph {
l.mut.RLock()
Expand Down
10 changes: 4 additions & 6 deletions internal/runtime/internal/controller/node_config_foreach.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,19 @@ func (fn *ForeachConfigNode) Run(ctx context.Context) error {
// TODO: log error
}

errChan := make(chan error, 1)
err = fn.run(errChan, updateTasks)
return err
return fn.run(ctx, updateTasks)
}

func (fn *ForeachConfigNode) run(errChan chan error, updateTasks func() error) error {
func (fn *ForeachConfigNode) run(ctx context.Context, updateTasks func() error) error {
for {
select {
case <-fn.forEachChildrenUpdateChan:
err := updateTasks()
if err != nil {
// TODO: log error
}
case err := <-errChan:
return err
case <-ctx.Done():
return nil
}
}
}
Expand Down

0 comments on commit 1984a4c

Please sign in to comment.