diff --git a/internal/runtime/internal/controller/node_config_foreach.go b/internal/runtime/internal/controller/node_config_foreach.go index c24215d589..618bc577c6 100644 --- a/internal/runtime/internal/controller/node_config_foreach.go +++ b/internal/runtime/internal/controller/node_config_foreach.go @@ -2,6 +2,8 @@ package controller import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "hash/fnv" "sync" @@ -16,7 +18,8 @@ type ForeachConfigNode struct { label string moduleController ModuleController - customComponents map[string]CustomComponent + customComponents map[string]CustomComponent + customComponentHashCounts map[string]int forEachChildrenUpdateChan chan struct{} // used to trigger an update of the running children forEachChildrenRunning bool @@ -46,6 +49,7 @@ func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *Forea moduleController: globals.NewModuleController(globalID), forEachChildrenUpdateChan: make(chan struct{}, 1), customComponents: make(map[string]CustomComponent, 0), + customComponentHashCounts: make(map[string]int, 0), } } @@ -88,10 +92,17 @@ func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error { // Loop through the items to create the custom components. // On re-evaluation new components are added and existing ones are updated. newCustomComponentIds := make(map[string]bool, len(args.Collection)) - // find something for the ids because we cannot use numbers - tmp := []string{"aaa", "bbb", "ccc", "ddd"} + fn.customComponentHashCounts = make(map[string]int) for i := 0; i < len(args.Collection); i++ { - customComponentID := tmp[i] + + // We must create an ID from the collection entries to avoid recreating all components on every updates. + // We track the hash counts because the collection might contain duplicates ([1, 1, 1] would result in the same ids + // so we handle it by adding the count at the end -> [11, 12, 13] + customComponentID := fmt.Sprintf("foreach_%s", hashObject(args.Collection[i])) + count := fn.customComponentHashCounts[customComponentID] // count = 0 if the key is not found + fn.customComponentHashCounts[customComponentID] = count + 1 + customComponentID += fmt.Sprintf("_%d", count+1) + cc, err := fn.getOrCreateCustomComponent(customComponentID) if err != nil { return err @@ -219,3 +230,19 @@ func (fi *forEachChild) Hash() uint64 { func (fi *forEachChild) Equals(other runner.Task) bool { return fi.id == other.(*forEachChild).id } + +func computeHash(s string) string { + hasher := sha256.New() + hasher.Write([]byte(s)) + fullHash := hasher.Sum(nil) + return hex.EncodeToString(fullHash[:12]) // taking only the 12 first char of the hash should be enough +} + +func hashObject(obj any) string { + switch v := obj.(type) { + case int, string, float64, bool: + return fmt.Sprintf("%v", v) + default: + return computeHash(fmt.Sprintf("%#v", v)) + } +} diff --git a/internal/runtime/internal/controller/node_config_foreach_test.go b/internal/runtime/internal/controller/node_config_foreach_test.go new file mode 100644 index 0000000000..1e21effed6 --- /dev/null +++ b/internal/runtime/internal/controller/node_config_foreach_test.go @@ -0,0 +1,290 @@ +package controller + +import ( + "context" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging" + "github.com/grafana/alloy/syntax/ast" + "github.com/grafana/alloy/syntax/parser" + "github.com/grafana/alloy/syntax/vm" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" +) + +func TestCreateCustomComponents(t *testing.T) { + config := `foreach "default" { + collection = [1, 2, 3] + var = "num" + template { + } + }` + foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t)) + require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{})))) + customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents + require.ElementsMatch(t, customComponentIds, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"}) + keys := make([]string, 0, len(foreachConfigNode.customComponents)) + for key := range foreachConfigNode.customComponents { + keys = append(keys, key) + } + require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"}) +} + +func TestCreateCustomComponentsDuplicatedIds(t *testing.T) { + config := `foreach "default" { + collection = [1, 2, 1] + var = "num" + template { + } + }` + foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t)) + require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{})))) + customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents + require.ElementsMatch(t, customComponentIds, []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"}) + keys := make([]string, 0, len(foreachConfigNode.customComponents)) + for key := range foreachConfigNode.customComponents { + keys = append(keys, key) + } + require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"}) +} + +func TestCreateCustomComponentsWithUpdate(t *testing.T) { + config := `foreach "default" { + collection = [1, 2, 3] + var = "num" + template { + } + }` + foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t)) + require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{})))) + customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents + require.ElementsMatch(t, customComponentIds, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"}) + keys := make([]string, 0, len(foreachConfigNode.customComponents)) + for key := range foreachConfigNode.customComponents { + keys = append(keys, key) + } + require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"}) + + newConfig := `foreach "default" { + collection = [2, 1, 1] + var = "num" + template { + } + }` + foreachConfigNode.moduleController.(*ModuleControllerMock).Reset() + foreachConfigNode.UpdateBlock(getBlockFromConfig(t, newConfig)) + require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{})))) + customComponentIds = foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents + + // Only the 2nd "1" item in the collection is created because the two others were already created. + require.ElementsMatch(t, customComponentIds, []string{"foreach_1_2"}) + + // "foreach31" was removed, "foreach12" was added + keys = make([]string, 0, len(foreachConfigNode.customComponents)) + for key := range foreachConfigNode.customComponents { + keys = append(keys, key) + } + require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"}) +} + +func TestRunCustomComponents(t *testing.T) { + config := `foreach "default" { + collection = [1, 2, 3] + var = "num" + template { + } + }` + foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t)) + require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{})))) + ctx, cancel := context.WithCancel(context.Background()) + go foreachConfigNode.Run(ctx) + + // check that all custom components are running correctly + require.EventuallyWithT(t, func(c *assert.CollectT) { + for _, cc := range foreachConfigNode.customComponents { + assert.True(c, cc.(*CustomComponentMock).IsRunning.Load()) + } + }, 1*time.Second, 5*time.Millisecond) + + cancel() + // check that all custom components are stopped + require.EventuallyWithT(t, func(c *assert.CollectT) { + for _, cc := range foreachConfigNode.customComponents { + assert.False(c, cc.(*CustomComponentMock).IsRunning.Load()) + } + }, 1*time.Second, 5*time.Millisecond) +} + +func TestRunCustomComponentsAfterUpdate(t *testing.T) { + config := `foreach "default" { + collection = [1, 2, 3] + var = "num" + template { + } + }` + foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t)) + require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{})))) + ctx, cancel := context.WithCancel(context.Background()) + go foreachConfigNode.Run(ctx) + + // check that all custom components are running correctly + require.EventuallyWithT(t, func(c *assert.CollectT) { + for _, cc := range foreachConfigNode.customComponents { + assert.True(c, cc.(*CustomComponentMock).IsRunning.Load()) + } + }, 1*time.Second, 5*time.Millisecond) + + newConfig := `foreach "default" { + collection = [2, 1, 1] + var = "num" + template { + } + }` + foreachConfigNode.moduleController.(*ModuleControllerMock).Reset() + foreachConfigNode.UpdateBlock(getBlockFromConfig(t, newConfig)) + require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{})))) + + newComponentIds := []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"} + // check that all new custom components are running correctly + require.EventuallyWithT(t, func(c *assert.CollectT) { + for id, cc := range foreachConfigNode.customComponents { + assert.Contains(c, newComponentIds, id) + assert.True(c, cc.(*CustomComponentMock).IsRunning.Load()) + } + }, 1*time.Second, 5*time.Millisecond) + + cancel() + // check that all custom components are stopped + require.EventuallyWithT(t, func(c *assert.CollectT) { + for _, cc := range foreachConfigNode.customComponents { + assert.False(c, cc.(*CustomComponentMock).IsRunning.Load()) + } + }, 1*time.Second, 5*time.Millisecond) +} + +func TestCreateCustomComponentsCollectionObjectsWithUpdate(t *testing.T) { + config := `foreach "default" { + collection = [obj1, obj2] + var = "num" + template { + } + }` + foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t)) + vars := map[string]interface{}{ + "obj1": map[string]string{ + "label1": "a", + "label2": "b", + }, + "obj2": map[string]string{ + "label3": "c", + }, + } + require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(vars))) + customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents + require.ElementsMatch(t, customComponentIds, []string{"foreach_be19d02a2ccb2cbc2c47e90d_1", "foreach_b335d50e2e8490eb8bf5f51b_1"}) + keys := make([]string, 0, len(foreachConfigNode.customComponents)) + for key := range foreachConfigNode.customComponents { + keys = append(keys, key) + } + require.ElementsMatch(t, keys, []string{"foreach_be19d02a2ccb2cbc2c47e90d_1", "foreach_b335d50e2e8490eb8bf5f51b_1"}) + + newConfig := `foreach "default" { + collection = [obj1, obj3] + var = "num" + template { + } + }` + vars2 := map[string]interface{}{ + "obj1": map[string]string{ + "label1": "a", + "label2": "b", + }, + "obj3": map[string]string{ + "label3": "d", + }, + } + foreachConfigNode.moduleController.(*ModuleControllerMock).Reset() + foreachConfigNode.UpdateBlock(getBlockFromConfig(t, newConfig)) + require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(vars2))) + customComponentIds = foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents + + // Create only the custom component for the obj3 because the one for obj1 was already created + require.ElementsMatch(t, customComponentIds, []string{"foreach_1464766cf9c8fd1095d0f7a2_1"}) + + // "foreachb335d50e2e8490eb8bf5f51b1" was removed, "foreach1464766cf9c8fd1095d0f7a21" was added + keys = make([]string, 0, len(foreachConfigNode.customComponents)) + for key := range foreachConfigNode.customComponents { + keys = append(keys, key) + } + require.ElementsMatch(t, keys, []string{"foreach_be19d02a2ccb2cbc2c47e90d_1", "foreach_1464766cf9c8fd1095d0f7a2_1"}) +} + +func getBlockFromConfig(t *testing.T, config string) *ast.BlockStmt { + file, err := parser.ParseFile("", []byte(config)) + require.NoError(t, err) + return file.Body[0].(*ast.BlockStmt) +} + +func getComponentGlobals(t *testing.T) ComponentGlobals { + l, _ := logging.New(os.Stderr, logging.DefaultOptions) + return ComponentGlobals{ + Logger: l, + TraceProvider: noop.NewTracerProvider(), + DataPath: t.TempDir(), + MinStability: featuregate.StabilityGenerallyAvailable, + OnBlockNodeUpdate: func(cn BlockNode) { /* no-op */ }, + Registerer: prometheus.NewRegistry(), + NewModuleController: func(id string) ModuleController { + return NewModuleControllerMock() + }, + } +} + +type ModuleControllerMock struct { + CustomComponents []string +} + +func NewModuleControllerMock() ModuleController { + return &ModuleControllerMock{ + CustomComponents: make([]string, 0), + } +} + +func (m *ModuleControllerMock) NewModule(id string, export component.ExportFunc) (component.Module, error) { + return nil, nil +} + +func (m *ModuleControllerMock) ModuleIDs() []string { + return nil +} + +func (m *ModuleControllerMock) NewCustomComponent(id string, export component.ExportFunc) (CustomComponent, error) { + m.CustomComponents = append(m.CustomComponents, id) + return &CustomComponentMock{}, nil +} + +func (m *ModuleControllerMock) Reset() { + m.CustomComponents = make([]string, 0) +} + +type CustomComponentMock struct { + IsRunning atomic.Bool +} + +func (c *CustomComponentMock) LoadBody(body ast.Body, args map[string]any, customComponentRegistry *CustomComponentRegistry) error { + return nil +} + +func (c *CustomComponentMock) Run(ctx context.Context) error { + c.IsRunning.Store(true) + <-ctx.Done() + c.IsRunning.Store(false) + return nil +}