Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum authored and ptodev committed Dec 16, 2024
1 parent a4fa0a8 commit c29bf8d
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 31 deletions.
3 changes: 2 additions & 1 deletion internal/runtime/internal/controller/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
exportBlockID = "export"
loggingBlockID = "logging"
tracingBlockID = "tracing"
foreachID = "foreach"
)

// NewConfigNode creates a new ConfigNode from an initial ast.BlockStmt.
Expand All @@ -29,7 +30,7 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d
return NewTracingConfigNode(block, globals), nil
case importsource.BlockImportFile, importsource.BlockImportString, importsource.BlockImportHTTP, importsource.BlockImportGit:
return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil
case importsource.BlockForeach:
case foreachID:
return NewForeachConfigNode(block, globals), nil
default:
var diags diag.Diagnostics
Expand Down
171 changes: 145 additions & 26 deletions internal/runtime/internal/controller/node_config_foreach.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ package controller
import (
"context"
"fmt"
"hash/fnv"
"sync"

"github.com/grafana/alloy/internal/runner"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/vm"
)

type ForeachConfigNode struct {
nodeID string
label string
block *ast.BlockStmt // Current Alloy blocks to derive config from
moduleController ModuleController
customComponents []CustomComponent

customComponents map[string]CustomComponent

forEachChildrenUpdateChan chan struct{} // used to trigger an update of the running children
forEachChildrenRunning bool

mut sync.RWMutex
block *ast.BlockStmt
eval *vm.Evaluator
}

var _ BlockNode = (*ForeachConfigNode)(nil)
Expand All @@ -24,18 +33,21 @@ var _ RunnableNode = (*ForeachConfigNode)(nil)
//TODO: We could implement this in the future?

type ForeachArguments struct {
Collection string `alloy:"collection,attr`
Collection []string `alloy:"collection,attr"`
}

func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ForeachConfigNode {
nodeID := BlockComponentID(block).String()
globalID := nodeID

return &ForeachConfigNode{
nodeID: nodeID,
label: block.Label,
block: block,
moduleController: globals.NewModuleController(globalID),
nodeID: nodeID,
label: block.Label,
block: block,
eval: vm.New(block.Body),
moduleController: globals.NewModuleController(globalID),
forEachChildrenUpdateChan: make(chan struct{}, 1),
customComponents: make(map[string]CustomComponent, 0),
}
}

Expand All @@ -46,31 +58,129 @@ func (fn *ForeachConfigNode) NodeID() string { return fn.nodeID }
func (fn *ForeachConfigNode) Block() *ast.BlockStmt { return fn.block }

func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error {
cc, err := fn.moduleController.NewCustomComponent("", func(exports map[string]any) {})
if err != nil {
return fmt.Errorf("creating custom component: %w", err)
}
fn.mut.Lock()
defer fn.mut.Unlock()

//TODO: Get the "template" block
//TODO: Prefix the custom components with something like "foreach.testForeach.1."
//TODO: find a way to evaluate the block?
collection, template, err := getArgs(fn.block.Body)
if err != nil {
return fmt.Errorf("parsing foreach block: %w", err)
}

//TODO: Take into account the actual items in the collection.
// The custom components should be able to use the values from the collection.
loopCount := len(collection)

// Loop through the items to create the custom components.
// On re-evaluation new components are added and existing ones are updated.
newCustomComponentIds := make(map[string]bool, loopCount)
tmp := []string{"aaa", "bbb", "ccc", "ddd"}
for i := 0; i < loopCount; i++ {
customComponentID := tmp[i]
cc, err := fn.getOrCreateCustomComponent(customComponentID)
if err != nil {
return err
}

args := map[string]any{}
// TODO: use the registry from the loader to access the modules
customComponentRegistry := NewCustomComponentRegistry(nil, scope)
if err := cc.LoadBody(template, args, customComponentRegistry); err != nil {
return fmt.Errorf("updating custom component: %w", err)
return fmt.Errorf("updating custom component in foreach: %w", err)
}
newCustomComponentIds[customComponentID] = true
}

fn.customComponents = append(fn.customComponents, cc)
// Delete the custom components that are no longer in the foreach.
// The runner pkg will stop them properly.
for id := range fn.customComponents {
if _, exist := newCustomComponentIds[id]; !exist {
delete(fn.customComponents, id)
}
}

// trigger to stop previous children from running and to start running the new ones.
if fn.forEachChildrenRunning {
select {
case fn.forEachChildrenUpdateChan <- struct{}{}: // queued trigger
default: // trigger already queued; no-op
}
}
return nil
}

func (fn *ForeachConfigNode) getOrCreateCustomComponent(customComponentID string) (CustomComponent, error) {
cc, exists := fn.customComponents[customComponentID]
if exists {
return cc, nil
}

newCC, err := fn.moduleController.NewCustomComponent(customComponentID, func(exports map[string]any) {})
if err != nil {
return nil, fmt.Errorf("creating custom component: %w", err)
}
fn.customComponents[customComponentID] = newCC
return newCC, nil
}

func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) {
fn.mut.Lock()
defer fn.mut.Unlock()
fn.block = b
}

func (fn *ForeachConfigNode) Run(ctx context.Context) error {
newCtx, cancel := context.WithCancel(ctx)
defer cancel()

runner := runner.New(func(forEachChild *forEachChild) runner.Worker {
return &forEachChildRunner{
child: forEachChild,
}
})
defer runner.Stop()

updateTasks := func() error {
fn.mut.Lock()
defer fn.mut.Unlock()
fn.forEachChildrenRunning = true
var tasks []*forEachChild
for customComponentID, customComponent := range fn.customComponents {
tasks = append(tasks, &forEachChild{
id: customComponentID,
cc: customComponent,
})
}

return runner.ApplyTasks(newCtx, tasks)
}

err := updateTasks()
if err != nil {
// TODO: log error
}

errChan := make(chan error, 1)
err = fn.run(errChan, updateTasks)
return err
}

func (fn *ForeachConfigNode) run(errChan chan error, updateTasks func() error) error {
for {
select {
case <-fn.forEachChildrenUpdateChan:
err := updateTasks()
if err != nil {
// TODO: log error
}
case err := <-errChan:
return err
}
}
}

func getArgs(body ast.Body) ([]ast.Expr, ast.Body, error) {
var collection []ast.Expr
var template ast.Body
Expand All @@ -82,7 +192,7 @@ func getArgs(body ast.Body) ([]ast.Expr, ast.Body, error) {
for _, stmt := range body {
switch stmt := stmt.(type) {
case *ast.BlockStmt:
if stmt.Label != "template" {
if stmt.GetBlockName() != "template" {
return nil, nil, fmt.Errorf("unknown block")
}
template = stmt.Body
Expand All @@ -104,19 +214,28 @@ func getArgs(body ast.Body) ([]ast.Expr, ast.Body, error) {
return collection, template, nil
}

func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) {
type forEachChildRunner struct {
child *forEachChild
}

func (fn *ForeachConfigNode) Run(ctx context.Context) error {
wg := &sync.WaitGroup{}
for _, cc := range fn.customComponents {
wg.Add(1)
go func(cc CustomComponent) {
defer wg.Done()
//TODO: Get the error
cc.Run(ctx)
}(cc)
type forEachChild struct {
cc CustomComponent
id string
}

func (fr *forEachChildRunner) Run(ctx context.Context) {
err := fr.child.cc.Run(ctx)
if err != nil {
// TODO: log and update health
}
//TODO: Return all the errors from Run functions which failed
return nil
}

func (fi *forEachChild) Hash() uint64 {
fnvHash := fnv.New64a()
fnvHash.Write([]byte(fi.id))
return fnvHash.Sum64()
}

func (fi *forEachChild) Equals(other runner.Task) bool {
return fi.id == other.(*forEachChild).id
}
4 changes: 0 additions & 4 deletions internal/runtime/internal/importsource/import_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ const (
String
Git
HTTP
Foreach
)

const (
BlockImportFile = "import.file"
BlockImportString = "import.string"
BlockImportHTTP = "import.http"
BlockImportGit = "import.git"
BlockForeach = "foreach"
)

const ModulePath = "module_path"
Expand Down Expand Up @@ -69,8 +67,6 @@ func GetSourceType(fullName string) SourceType {
return HTTP
case BlockImportGit:
return Git
case BlockForeach:
return Foreach
}
panic(fmt.Errorf("name does not map to a known source type: %v", fullName))
}

0 comments on commit c29bf8d

Please sign in to comment.