diff --git a/internal/runtime/internal/controller/loader.go b/internal/runtime/internal/controller/loader.go index fae75f5865..51ff084234 100644 --- a/internal/runtime/internal/controller/loader.go +++ b/internal/runtime/internal/controller/loader.go @@ -149,7 +149,9 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics { l.cm.controllerEvaluation.Set(1) defer l.cm.controllerEvaluation.Set(0) - l.cache.SetScope(options.ArgScope) + if options.ArgScope != nil { + l.cache.UpdateScopeVariables(options.ArgScope.Variables) + } for key, value := range options.Args { l.cache.CacheModuleArgument(key, value) @@ -615,7 +617,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { // Finally, wire component references. l.cache.mut.RLock() - refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope, l.globals.MinStability) + refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.GetScope(), l.globals.MinStability) l.cache.mut.RUnlock() for _, ref := range refs { g.AddEdge(dag.Edge{From: n, To: ref.Target}) @@ -645,7 +647,7 @@ func (l *Loader) wireCustomComponentNode(g *dag.Graph, cc *CustomComponentNode) // Variables returns the Variables the Loader exposes for other components to // reference. func (l *Loader) Variables() map[string]interface{} { - return l.cache.BuildContext().Variables + return l.cache.GetScope().Variables } // Components returns the current set of loaded components. @@ -780,7 +782,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr var err error switch n := n.(type) { case BlockNode: - ectx := l.cache.BuildContext() + ectx := l.cache.GetScope() // RLock before evaluate to prevent Evaluating while the config is being reloaded l.mut.RLock() @@ -819,7 +821,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr // evaluate constructs the final context for the BlockNode and // evaluates it. mut must be held when calling evaluate. func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error { - ectx := l.cache.BuildContext() + ectx := l.cache.GetScope() err := bn.Evaluate(ectx) return l.postEvaluate(logger, bn, err) } @@ -829,12 +831,11 @@ func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error { func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error { switch c := bn.(type) { case ComponentNode: - // Always update the cache both the arguments and exports, since both might - // change when a component gets re-evaluated. We also want to cache the arguments and exports in case of an error - l.cache.CacheArguments(c.ID(), c.Arguments()) + // Always update the cache both the exports, since that it might change when a component gets re-evaluated. + // We also want to cache it in case of an error l.cache.CacheExports(c.ID(), c.Exports()) case *ArgumentConfigNode: - if _, found := l.cache.moduleArguments[c.Label()]; !found { + if _, found := l.cache.GetModuleArgument(c.Label()); !found { if c.Optional() { l.cache.CacheModuleArgument(c.Label(), c.Default()) } else { diff --git a/internal/runtime/internal/controller/value_cache.go b/internal/runtime/internal/controller/value_cache.go index cdce5825a7..a3a2238428 100644 --- a/internal/runtime/internal/controller/value_cache.go +++ b/internal/runtime/internal/controller/value_cache.go @@ -2,6 +2,7 @@ package controller import ( "reflect" + "strings" "sync" "github.com/grafana/alloy/internal/component" @@ -15,46 +16,26 @@ import ( // components to be evaluated. type valueCache struct { mut sync.RWMutex - components map[string]ComponentID // NodeID -> ComponentID - args map[string]interface{} // NodeID -> component arguments value - exports map[string]interface{} // NodeID -> component exports value - moduleArguments map[string]any // key -> module arguments value - moduleExports map[string]any // name -> value for the value of module exports - moduleChangedIndex int // Everytime a change occurs this is incremented - scope *vm.Scope // scope provides additional context for the nodes in the module + moduleExports map[string]any // name -> value for the value of module exports + moduleChangedIndex int // Everytime a change occurs this is incremented + scope *vm.Scope // scope provides additional context for the nodes in the module } // newValueCache creates a new ValueCache. func newValueCache() *valueCache { return &valueCache{ - components: make(map[string]ComponentID), - args: make(map[string]interface{}), - exports: make(map[string]interface{}), - moduleArguments: make(map[string]any), - moduleExports: make(map[string]any), + moduleExports: make(map[string]any), + scope: vm.NewScope(make(map[string]interface{})), } } -func (vc *valueCache) SetScope(scope *vm.Scope) { - vc.mut.Lock() - defer vc.mut.Unlock() - vc.scope = scope -} - -// CacheArguments will cache the provided arguments by the given id. args may -// be nil to store an empty object. -func (vc *valueCache) CacheArguments(id ComponentID, args component.Arguments) { +func (vc *valueCache) UpdateScopeVariables(variables map[string]interface{}) { + if variables == nil { + return + } vc.mut.Lock() defer vc.mut.Unlock() - - nodeID := id.String() - vc.components[nodeID] = id - - var argsVal interface{} = make(map[string]interface{}) - if args != nil { - argsVal = args - } - vc.args[nodeID] = argsVal + vc.scope.Variables = deepCopyMap(variables) } // CacheExports will cache the provided exports using the given id. exports may @@ -63,14 +44,30 @@ func (vc *valueCache) CacheExports(id ComponentID, exports component.Exports) { vc.mut.Lock() defer vc.mut.Unlock() - nodeID := id.String() - vc.components[nodeID] = id + variables := vc.scope.Variables + for i, t := range id { + if _, ok := variables[t]; !ok { + variables[t] = make(map[string]interface{}) + } + if i == len(id)-1 { + break + } + variables = variables[t].(map[string]interface{}) + } - var exportsVal interface{} = make(map[string]interface{}) if exports != nil { - exportsVal = exports + variables[id[len(id)-1]] = exports + } +} + +func (vc *valueCache) GetModuleArgument(key string) (interface{}, bool) { + vc.mut.RLock() + defer vc.mut.RUnlock() + if _, ok := vc.scope.Variables["argument"]; !ok { + return nil, false } - vc.exports[nodeID] = exportsVal + v, exist := vc.scope.Variables["argument"].(map[string]interface{})[key] + return v, exist } // CacheModuleArgument will cache the provided exports using the given id. @@ -78,11 +75,12 @@ func (vc *valueCache) CacheModuleArgument(key string, value any) { vc.mut.Lock() defer vc.mut.Unlock() - if value == nil { - vc.moduleArguments[key] = nil - } else { - vc.moduleArguments[key] = value + if _, ok := vc.scope.Variables["argument"]; !ok { + vc.scope.Variables["argument"] = make(map[string]interface{}) } + keyMap := make(map[string]any) + keyMap["value"] = value + vc.scope.Variables["argument"].(map[string]interface{})[key] = keyMap } // CacheModuleExportValue saves the value to the map @@ -134,22 +132,34 @@ func (vc *valueCache) ExportChangeIndex() int { // ids. SyncIDs should be called with the current set of components after the // graph is updated. func (vc *valueCache) SyncIDs(ids []ComponentID) { - expectMap := make(map[string]ComponentID, len(ids)) - for _, id := range ids { - expectMap[id.String()] = id - } - vc.mut.Lock() defer vc.mut.Unlock() - for id := range vc.components { - if _, keep := expectMap[id]; keep { - continue + validPaths := make(map[string]struct{}) + for _, id := range ids { + validPaths[strings.Join(id, ".")] = struct{}{} + } + + var cleanMap func(map[string]interface{}, []string) + cleanMap = func(currentMap map[string]interface{}, currentPath []string) { + for key, value := range currentMap { + nextPath := append(currentPath, key) + pathKey := strings.Join(nextPath, ".") + if _, ok := validPaths[pathKey]; !ok { + // If not a valid path and not a map, delete it + if subMap, isMap := value.(map[string]interface{}); isMap { + cleanMap(subMap, nextPath) // Recurse for nested maps + if len(subMap) == 0 { + delete(currentMap, key) // Remove empty maps + } + } else { + delete(currentMap, key) + } + } } - delete(vc.components, id) - delete(vc.args, id) - delete(vc.exports, id) } + + cleanMap(vc.scope.Variables, []string{}) } // SyncModuleArgs will remove any cached values for any args no longer in the map. @@ -157,108 +167,37 @@ func (vc *valueCache) SyncModuleArgs(args map[string]any) { vc.mut.Lock() defer vc.mut.Unlock() - for id := range vc.moduleArguments { - if _, keep := args[id]; keep { - continue - } - delete(vc.moduleArguments, id) - } -} - -// BuildContext builds a vm.Scope based on the current set of cached values. -// The arguments and exports for the same ID are merged into one object. -func (vc *valueCache) BuildContext() *vm.Scope { - vc.mut.RLock() - defer vc.mut.RUnlock() - - scope := vm.NewScope(make(map[string]interface{})) - - // First, partition components by Alloy block name. - var componentsByBlockName = make(map[string][]ComponentID) - for _, id := range vc.components { - blockName := id[0] - componentsByBlockName[blockName] = append(componentsByBlockName[blockName], id) - } - - // Then, convert each partition into a single value. - for blockName, ids := range componentsByBlockName { - scope.Variables[blockName] = vc.buildValue(ids, 1) - } - - // Add module arguments to the scope. - if len(vc.moduleArguments) > 0 { - scope.Variables["argument"] = make(map[string]any) + if _, ok := vc.scope.Variables["argument"]; !ok { + return } - for key, value := range vc.moduleArguments { - keyMap := make(map[string]any) - keyMap["value"] = value - switch args := scope.Variables["argument"].(type) { - case map[string]any: - args[key] = keyMap + argsMap := vc.scope.Variables["argument"].(map[string]any) + for arg := range argsMap { + if _, ok := args[arg]; !ok { + delete(argsMap, arg) } } - - if vc.scope != nil { - // Merges the current scope with the one built from the components at this level. - scope.Variables = deepMergeMaps(vc.scope.Variables, scope.Variables) + if len(argsMap) == 0 { + delete(vc.scope.Variables, "argument") } - - return scope } -// buildValue recursively converts the set of user components into a single -// value. offset is used to determine which element in the userComponentName -// we're looking at. -func (vc *valueCache) buildValue(from []ComponentID, offset int) interface{} { - // We can't recurse anymore; return the node directly. - if len(from) == 1 && offset >= len(from[0]) { - name := from[0].String() - - // TODO(rfratto): should we allow arguments to be returned so users can - // reference arguments as well as exports? - exports, ok := vc.exports[name] - if !ok { - exports = make(map[string]interface{}) - } - return exports - } - - attrs := make(map[string]interface{}) - - // First, partition the components by their label. - var componentsByLabel = make(map[string][]ComponentID) - for _, id := range from { - blockName := id[offset] - componentsByLabel[blockName] = append(componentsByLabel[blockName], id) - } - - // Then, convert each partition into a single value. - for label, ids := range componentsByLabel { - attrs[label] = vc.buildValue(ids, offset+1) - } - return attrs +// GetScope returns the current scope. +func (vc *valueCache) GetScope() *vm.Scope { + vc.mut.RLock() + defer vc.mut.RUnlock() + return vm.NewScope(deepCopyMap(vc.scope.Variables)) } -// deepMergeMaps merges two maps. It uses the values of map2 in case of conflict. -func deepMergeMaps(map1, map2 map[string]any) map[string]any { - merged := make(map[string]any, len(map1)+len(map2)) - - for key, value := range map1 { - merged[key] = value - } - - for key, value := range map2 { - if existingValue, exists := merged[key]; exists { - if map1Value, ok1 := existingValue.(map[string]any); ok1 { - if map2Value, ok2 := value.(map[string]any); ok2 { - merged[key] = deepMergeMaps(map1Value, map2Value) - continue - } - } +func deepCopyMap(original map[string]any) map[string]any { + newMap := make(map[string]any, len(original)) + for key, value := range original { + switch v := value.(type) { + case map[string]any: + newMap[key] = deepCopyMap(v) + default: + newMap[key] = v } - merged[key] = value } - - return merged + return newMap } diff --git a/internal/runtime/internal/controller/value_cache_test.go b/internal/runtime/internal/controller/value_cache_test.go index d481046371..fbb184004a 100644 --- a/internal/runtime/internal/controller/value_cache_test.go +++ b/internal/runtime/internal/controller/value_cache_test.go @@ -54,15 +54,12 @@ func TestValueCache(t *testing.T) { // For now, only exports are placed in generated objects, which is why the // bar values are empty and the foo object only contains the exports. - vc.CacheArguments(ComponentID{"foo"}, fooArgs{Something: true}) vc.CacheExports(ComponentID{"foo"}, fooExports{SomethingElse: true}) - vc.CacheArguments(ComponentID{"bar", "label_a"}, barArgs{Number: 12}) - vc.CacheArguments(ComponentID{"bar", "label_b"}, barArgs{Number: 34}) - res := vc.BuildContext() + res := vc.GetScope() var ( - expectKeys = []string{"foo", "bar"} + expectKeys = []string{"foo"} actualKeys []string ) for varName := range res.Variables { @@ -73,12 +70,7 @@ func TestValueCache(t *testing.T) { type object = map[string]interface{} expectFoo := fooExports{SomethingElse: true} - expectBar := object{ - "label_a": object{}, // no exports for bar - "label_b": object{}, // no exports for bar - } require.Equal(t, expectFoo, res.Variables["foo"]) - require.Equal(t, expectBar, res.Variables["bar"]) } func TestExportValueCache(t *testing.T) { @@ -157,37 +149,36 @@ func TestModuleArgumentCache(t *testing.T) { vc.CacheModuleArgument("arg", tc.argValue) // Build the scope and validate it - res := vc.BuildContext() + res := vc.GetScope() expected := map[string]any{"arg": map[string]any{"value": tc.argValue}} require.Equal(t, expected, res.Variables["argument"]) // Sync arguments where the arg shouldn't change syncArgs := map[string]any{"arg": tc.argValue} vc.SyncModuleArgs(syncArgs) - res = vc.BuildContext() + res = vc.GetScope() require.Equal(t, expected, res.Variables["argument"]) // Sync arguments where the arg should clear out syncArgs = map[string]any{} vc.SyncModuleArgs(syncArgs) - res = vc.BuildContext() + res = vc.GetScope() require.Equal(t, map[string]any{}, res.Variables) }) } } -func TestBuildContextWithScope(t *testing.T) { +func TestScope(t *testing.T) { vc := newValueCache() - scope := vm.NewScope( + vc.scope = vm.NewScope( map[string]any{ "test": map[string]any{ "scope": barArgs{Number: 13}, }, }, ) - vc.SetScope(scope) vc.CacheExports(ComponentID{"foo", "bar"}, barArgs{Number: 12}) - res := vc.BuildContext() + res := vc.GetScope() expected := map[string]any{ "test": map[string]any{ @@ -204,18 +195,17 @@ func TestBuildContextWithScope(t *testing.T) { require.Equal(t, expected, res.Variables) } -func TestBuildContextWithScopeConflict(t *testing.T) { +func TestScopeSameNamespace(t *testing.T) { vc := newValueCache() - scope := vm.NewScope( + vc.scope = vm.NewScope( map[string]any{ "test": map[string]any{ "scope": barArgs{Number: 13}, }, }, ) - vc.SetScope(scope) vc.CacheExports(ComponentID{"test", "bar"}, barArgs{Number: 12}) - res := vc.BuildContext() + res := vc.GetScope() expected := map[string]any{ "test": map[string]any{ @@ -230,18 +220,17 @@ func TestBuildContextWithScopeConflict(t *testing.T) { require.Equal(t, expected, res.Variables) } -func TestBuildContextWithScopeOverride(t *testing.T) { +func TestScopeOverride(t *testing.T) { vc := newValueCache() - scope := vm.NewScope( + vc.scope = vm.NewScope( map[string]any{ "test": map[string]any{ "scope": barArgs{Number: 13}, }, }, ) - vc.SetScope(scope) vc.CacheExports(ComponentID{"test", "scope"}, barArgs{Number: 12}) - res := vc.BuildContext() + res := vc.GetScope() expected := map[string]any{ "test": map[string]any{ @@ -251,20 +240,31 @@ func TestBuildContextWithScopeOverride(t *testing.T) { }, } require.Equal(t, expected, res.Variables) +} - originalScope := vm.NewScope( +func TestScopePathOverride(t *testing.T) { + vc := newValueCache() + vc.scope = vm.NewScope( map[string]any{ "test": map[string]any{ "scope": barArgs{Number: 13}, }, }, ) - require.Equal(t, vc.scope, originalScope) // ensure that the original scope is not modified after building the context + vc.CacheExports(ComponentID{"test"}, barArgs{Number: 12}) + res := vc.GetScope() + + expected := map[string]any{ + "test": barArgs{ + Number: 12, + }, + } + require.Equal(t, expected, res.Variables) } -func TestScopeMergeMoreNesting(t *testing.T) { +func TestScopeComplex(t *testing.T) { vc := newValueCache() - scope := vm.NewScope( + vc.scope = vm.NewScope( map[string]any{ "test": map[string]any{ "cp1": map[string]any{ @@ -274,10 +274,9 @@ func TestScopeMergeMoreNesting(t *testing.T) { }, }, ) - vc.SetScope(scope) vc.CacheExports(ComponentID{"test", "cp1", "foo"}, barArgs{Number: 12}) vc.CacheExports(ComponentID{"test", "cp1", "bar", "fizz"}, barArgs{Number: 2}) - res := vc.BuildContext() + res := vc.GetScope() expected := map[string]any{ "test": map[string]any{ @@ -293,3 +292,42 @@ func TestScopeMergeMoreNesting(t *testing.T) { } require.Equal(t, expected, res.Variables) } + +func TestSyncIds(t *testing.T) { + vc := newValueCache() + vc.scope = vm.NewScope( + map[string]any{ + "test": map[string]any{ + "cp1": map[string]any{ + "scope": barArgs{Number: 13}, + }, + "cp2": barArgs{Number: 12}, + }, + "test2": map[string]any{ + "cp1": map[string]any{ + "scope": barArgs{Number: 13}, + }, + }, + "test3": 5, + }, + ) + vc.CacheExports(ComponentID{"test", "cp1", "bar", "fizz"}, barArgs{Number: 2}) + vc.SyncIDs( + []ComponentID{ + {"test", "cp2"}, + {"test", "cp1", "bar", "fizz"}, + }, + ) + expected := map[string]any{ + "test": map[string]any{ + "cp1": map[string]any{ + "bar": map[string]any{ + "fizz": barArgs{Number: 2}, + }, + }, + "cp2": barArgs{Number: 12}, + }, + } + res := vc.GetScope() + require.Equal(t, expected, res.Variables) +} diff --git a/syntax/vm/vm.go b/syntax/vm/vm.go index 9a61f395b8..544d6574fe 100644 --- a/syntax/vm/vm.go +++ b/syntax/vm/vm.go @@ -472,12 +472,13 @@ func NewScope(variables map[string]interface{}) *Scope { // Lookup looks up a named identifier from the scope and the stdlib. func (s *Scope) Lookup(name string) (interface{}, bool) { - // Traverse the scope first, then fall back to stdlib. + // Check the scope first. if s != nil { if val, ok := s.Variables[name]; ok { return val, true } } + // Falls back to the stdlik. if ident, ok := stdlib.Identifiers[name]; ok { return ident, true }