Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Dec 16, 2024
1 parent 1fdec1b commit a4fa0a8
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 23 deletions.
16 changes: 13 additions & 3 deletions internal/runtime/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ func buildTestImportFile(t *testing.T, filename string) testImportFile {

// This is a copy of TestImportFile.
// It may need to be modified further to make it work with a foreach.
//
// TODO: Why does this test fail? It seems to be running multiple times,
// then fails on the final retry since it's interrupted by a shutdown
// as indicated in the "node exited without error" log message.
//
// TODO: Test a foreach inside a foreach.
// TODO: Test foreach with clustering.
func TestForeach(t *testing.T) {
directory := "./testdata/foreach"
for _, file := range getTestFiles(directory, t) {
Expand Down Expand Up @@ -132,6 +139,9 @@ func TestImportFile(t *testing.T) {
}
}

// TODO: Why does this test fail? It seems to be running multiple times,
// then fails on the final retry since it's interrupted by a shutdown
// as indicated in the "node exited without error" log message.
func TestImportString(t *testing.T) {
directory := "./testdata/import_string"
for _, file := range getTestFiles(directory, t) {
Expand Down Expand Up @@ -386,9 +396,9 @@ func testConfig2(t *testing.T, config string, reloadConfig string, update func()
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports_2](t, ctrl, "", "testcomponents.summation2.final")
// If each iteration of the for loop adds a 1,
// and there are 3 iterations, we expect 3 to be the end result.
//TODO: Make this configurable?
return export.Sum == 3
// and there are 4 iterations, we expect 4 to be the end result.
//TODO: Make the expected "sum" value configurable?
return export.Sum == 4
}, 3*time.Second, 10*time.Millisecond)

// if update != nil {
Expand Down
93 changes: 84 additions & 9 deletions internal/runtime/internal/controller/node_config_foreach.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
package controller

import (
"context"
"fmt"
"sync"

"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/vm"
)

type ForeachConfigNode struct {
nodeID string
label string
block *ast.BlockStmt // Current Alloy blocks to derive config from
nodeID string
label string
block *ast.BlockStmt // Current Alloy blocks to derive config from
moduleController ModuleController
customComponents []CustomComponent
}

var _ BlockNode = (*ForeachConfigNode)(nil)
var _ RunnableNode = (*ForeachConfigNode)(nil)

// For now the Foreach doesn't have the ability to export arguments.
//TODO: We could implement this in the future?

type ForeachArguments struct {
Collection string `alloy:"collection,attr`
//TODO: Is the "var" argument really needed?
// We could just have a variable with a fixed name referencing the current thing we are iterating over.
Var string `alloy:"var,attr,optional`
}

func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ForeachConfigNode {
nodeID := BlockComponentID(block).String()
globalID := nodeID

return &ForeachConfigNode{
nodeID: nodeID,
label: block.Label,
block: block,
nodeID: nodeID,
label: block.Label,
block: block,
moduleController: globals.NewModuleController(globalID),
}
}

Expand All @@ -40,8 +46,77 @@ func (fn *ForeachConfigNode) NodeID() string { return fn.nodeID }
func (fn *ForeachConfigNode) Block() *ast.BlockStmt { return fn.block }

func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error {
cc, err := fn.moduleController.NewCustomComponent("", func(exports map[string]any) {})
if err != nil {
return fmt.Errorf("creating custom component: %w", err)
}

//TODO: Get the "template" block
//TODO: Prefix the custom components with something like "foreach.testForeach.1."
collection, template, err := getArgs(fn.block.Body)

//TODO: Take into account the actual items in the collection.
// The custom components should be able to use the values from the collection.
loopCount := len(collection)

for i := 0; i < loopCount; i++ {
args := map[string]any{}
customComponentRegistry := NewCustomComponentRegistry(nil, scope)
if err := cc.LoadBody(template, args, customComponentRegistry); err != nil {
return fmt.Errorf("updating custom component: %w", err)
}

fn.customComponents = append(fn.customComponents, cc)
}
return nil
}

func getArgs(body ast.Body) ([]ast.Expr, ast.Body, error) {
var collection []ast.Expr
var template ast.Body

if len(body) != 2 {
return nil, nil, fmt.Errorf("foreach block must have two children")
}

for _, stmt := range body {
switch stmt := stmt.(type) {
case *ast.BlockStmt:
if stmt.Label != "template" {
return nil, nil, fmt.Errorf("unknown block")
}
template = stmt.Body
case *ast.AttributeStmt:
if stmt.Name.Name != "collection" {
return nil, nil, fmt.Errorf("unknown attribute")
}
attrExpr, ok := stmt.Value.(*ast.ArrayExpr)
if !ok {
return nil, nil, fmt.Errorf("collection must be an array")
}
collection = attrExpr.Elements

default:
return nil, nil, fmt.Errorf("unknown argument")
}
}

return collection, template, nil
}

func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) {
}

func (fn *ForeachConfigNode) Run(ctx context.Context) error {
wg := &sync.WaitGroup{}
for _, cc := range fn.customComponents {
wg.Add(1)
go func(cc CustomComponent) {
defer wg.Done()
//TODO: Get the error
cc.Run(ctx)
}(cc)
}
//TODO: Return all the errors from Run functions which failed
return nil
}
2 changes: 1 addition & 1 deletion internal/runtime/internal/testcomponents/sumation1.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func init() {
component.Register(component.Registration{
Name: "testcomponents.summation_entry",
Name: "testcomponents.summation1",
Stability: featuregate.StabilityPublicPreview,
Args: SummationConfig_Entry{},
Exports: SummationExports_Entry{},
Expand Down
16 changes: 14 additions & 2 deletions internal/runtime/internal/testcomponents/sumation2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ type IntReceiver interface {
}

type IntReceiverImpl struct {
sum atomic.Int32
sum atomic.Int32
updateSumExport func(int)
}

func (r IntReceiverImpl) ReceiveInt(i int) {
r.sum.Add(int32(i))
new := r.sum.Add(int32(i))
r.updateSumExport(int(new))
}

type SummationConfig_2 struct {
Expand All @@ -52,6 +54,15 @@ type Summation_2 struct {
// NewSummation creates a new summation component.
func NewSummation_2(o component.Options, cfg SummationConfig_2) (*Summation_2, error) {
recv := IntReceiverImpl{}

recv.updateSumExport = func(newSum int) {
o.Logger.Log("msg", "Summation_2: new sum", "sum", newSum)
o.OnStateChange(SummationExports_2{
Receiver: recv,
Sum: newSum,
})
}

o.OnStateChange(SummationExports_2{
Receiver: recv,
})
Expand All @@ -77,5 +88,6 @@ func (t *Summation_2) Run(ctx context.Context) error {

// Update implements Component.
func (t *Summation_2) Update(args component.Arguments) error {

return nil
}
17 changes: 9 additions & 8 deletions internal/runtime/testdata/foreach/foreach_1.txtar
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
-- main.alloy --
foreach "testForeach" {
collection = [1, 2, 3, 4]
//var = "num"

// Similar to testcomponents.summation, but with a "forward_to"
testcomponents.summation1 "sum" {
//TODO: Use the num variable here
// input = num
input = 1
forward_to = testcomponents.summation2.final.receiver
template {
// Similar to testcomponents.summation, but with a "forward_to"
testcomponents.summation1 "sum" {
//TODO: Use the num variable here
// input = num
input = 1
forward_to = [testcomponents.summation2.final.receiver]
}
}
}

// Similar to testcomponents.summation, but with a "receiver" export
testcomponents.summation2 "final" {
}
}

0 comments on commit a4fa0a8

Please sign in to comment.