Skip to content

Commit

Permalink
refactor value_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Nov 27, 2024
1 parent cc9c0a3 commit 1a1f1a9
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 187 deletions.
16 changes: 9 additions & 7 deletions internal/runtime/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics {
l.cm.controllerEvaluation.Set(1)
defer l.cm.controllerEvaluation.Set(0)

l.cache.SetScope(options.ArgScope)
if options.ArgScope != nil {
l.cache.UpdateScopeVariables(options.ArgScope.Variables)
}

for key, value := range options.Args {
l.cache.CacheModuleArgument(key, value)
Expand Down Expand Up @@ -615,7 +617,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {

// Finally, wire component references.
l.cache.mut.RLock()
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope, l.globals.MinStability)
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.GetScope(), l.globals.MinStability)
l.cache.mut.RUnlock()
for _, ref := range refs {
g.AddEdge(dag.Edge{From: n, To: ref.Target})
Expand Down Expand Up @@ -645,7 +647,7 @@ func (l *Loader) wireCustomComponentNode(g *dag.Graph, cc *CustomComponentNode)
// Variables returns the Variables the Loader exposes for other components to
// reference.
func (l *Loader) Variables() map[string]interface{} {
return l.cache.BuildContext().Variables
return l.cache.GetScope().Variables
}

// Components returns the current set of loaded components.
Expand Down Expand Up @@ -780,7 +782,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr
var err error
switch n := n.(type) {
case BlockNode:
ectx := l.cache.BuildContext()
ectx := l.cache.GetScope()

// RLock before evaluate to prevent Evaluating while the config is being reloaded
l.mut.RLock()
Expand Down Expand Up @@ -819,7 +821,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr
// evaluate constructs the final context for the BlockNode and
// evaluates it. mut must be held when calling evaluate.
func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error {
ectx := l.cache.BuildContext()
ectx := l.cache.GetScope()
err := bn.Evaluate(ectx)
return l.postEvaluate(logger, bn, err)
}
Expand All @@ -831,10 +833,10 @@ func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error
case ComponentNode:
// Always update the cache both the arguments and exports, since both might
// change when a component gets re-evaluated. We also want to cache the arguments and exports in case of an error
l.cache.CacheArguments(c.ID(), c.Arguments())
//l.cache.CacheArguments(c.ID(), c.Arguments())
l.cache.CacheExports(c.ID(), c.Exports())
case *ArgumentConfigNode:
if _, found := l.cache.moduleArguments[c.Label()]; !found {
if _, found := l.cache.GetModuleArgument(c.Label()); !found {
if c.Optional() {
l.cache.CacheModuleArgument(c.Label(), c.Default())
} else {
Expand Down
229 changes: 86 additions & 143 deletions internal/runtime/internal/controller/value_cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package controller

import (
"fmt"
"reflect"
"strings"
"sync"

"github.com/grafana/alloy/internal/component"
Expand All @@ -15,46 +17,26 @@ import (
// components to be evaluated.
type valueCache struct {
mut sync.RWMutex
components map[string]ComponentID // NodeID -> ComponentID
args map[string]interface{} // NodeID -> component arguments value
exports map[string]interface{} // NodeID -> component exports value
moduleArguments map[string]any // key -> module arguments value
moduleExports map[string]any // name -> value for the value of module exports
moduleChangedIndex int // Everytime a change occurs this is incremented
scope *vm.Scope // scope provides additional context for the nodes in the module
moduleExports map[string]any // name -> value for the value of module exports
moduleChangedIndex int // Everytime a change occurs this is incremented
scope *vm.Scope // scope provides additional context for the nodes in the module
}

// newValueCache creates a new ValueCache.
func newValueCache() *valueCache {
return &valueCache{
components: make(map[string]ComponentID),
args: make(map[string]interface{}),
exports: make(map[string]interface{}),
moduleArguments: make(map[string]any),
moduleExports: make(map[string]any),
moduleExports: make(map[string]any),
scope: vm.NewScope(make(map[string]interface{})),
}
}

func (vc *valueCache) SetScope(scope *vm.Scope) {
func (vc *valueCache) UpdateScopeVariables(variables map[string]interface{}) {
if variables == nil {
return
}
vc.mut.Lock()
defer vc.mut.Unlock()
vc.scope = scope
}

// CacheArguments will cache the provided arguments by the given id. args may
// be nil to store an empty object.
func (vc *valueCache) CacheArguments(id ComponentID, args component.Arguments) {
vc.mut.Lock()
defer vc.mut.Unlock()

nodeID := id.String()
vc.components[nodeID] = id

var argsVal interface{} = make(map[string]interface{})
if args != nil {
argsVal = args
}
vc.args[nodeID] = argsVal
vc.scope.Variables = deepCopyMap(variables)
}

// CacheExports will cache the provided exports using the given id. exports may
Expand All @@ -63,26 +45,46 @@ func (vc *valueCache) CacheExports(id ComponentID, exports component.Exports) {
vc.mut.Lock()
defer vc.mut.Unlock()

nodeID := id.String()
vc.components[nodeID] = id
variables := vc.scope.Variables
for i, t := range id {
if _, ok := variables[t]; !ok {
variables[t] = make(map[string]interface{})
}
if i == len(id)-1 {
break
}
if variables[t] == nil {
fmt.Println(variables, id)
}
variables = variables[t].(map[string]interface{})
}

var exportsVal interface{} = make(map[string]interface{})
if exports != nil {
exportsVal = exports
variables[id[len(id)-1]] = exports
}
vc.exports[nodeID] = exportsVal
}

func (vc *valueCache) GetModuleArgument(key string) (interface{}, bool) {
vc.mut.RLock()
defer vc.mut.RUnlock()
if _, ok := vc.scope.Variables["argument"]; !ok {
return nil, false
}
v, exist := vc.scope.Variables["argument"].(map[string]interface{})[key]
return v, exist
}

// CacheModuleArgument will cache the provided exports using the given id.
func (vc *valueCache) CacheModuleArgument(key string, value any) {
vc.mut.Lock()
defer vc.mut.Unlock()

if value == nil {
vc.moduleArguments[key] = nil
} else {
vc.moduleArguments[key] = value
if _, ok := vc.scope.Variables["argument"]; !ok {
vc.scope.Variables["argument"] = make(map[string]interface{})
}
keyMap := make(map[string]any)
keyMap["value"] = value
vc.scope.Variables["argument"].(map[string]interface{})[key] = keyMap
}

// CacheModuleExportValue saves the value to the map
Expand Down Expand Up @@ -134,131 +136,72 @@ func (vc *valueCache) ExportChangeIndex() int {
// ids. SyncIDs should be called with the current set of components after the
// graph is updated.
func (vc *valueCache) SyncIDs(ids []ComponentID) {
expectMap := make(map[string]ComponentID, len(ids))
for _, id := range ids {
expectMap[id.String()] = id
}

vc.mut.Lock()
defer vc.mut.Unlock()

for id := range vc.components {
if _, keep := expectMap[id]; keep {
continue
validPaths := make(map[string]struct{})
for _, id := range ids {
validPaths[strings.Join(id, ".")] = struct{}{}
}

var cleanMap func(map[string]interface{}, []string)
cleanMap = func(currentMap map[string]interface{}, currentPath []string) {
for key, value := range currentMap {
nextPath := append(currentPath, key)
pathKey := strings.Join(nextPath, ".")
if _, ok := validPaths[pathKey]; !ok {
// If not a valid path and not a map, delete it
if subMap, isMap := value.(map[string]interface{}); isMap {
cleanMap(subMap, nextPath) // Recurse for nested maps
if len(subMap) == 0 {
delete(currentMap, key) // Remove empty maps
}
} else {
delete(currentMap, key)
}
}
}
delete(vc.components, id)
delete(vc.args, id)
delete(vc.exports, id)
}

cleanMap(vc.scope.Variables, []string{})
}

// SyncModuleArgs will remove any cached values for any args no longer in the map.
func (vc *valueCache) SyncModuleArgs(args map[string]any) {
vc.mut.Lock()
defer vc.mut.Unlock()

for id := range vc.moduleArguments {
if _, keep := args[id]; keep {
continue
}
delete(vc.moduleArguments, id)
if _, ok := vc.scope.Variables["argument"]; !ok {
return
}
}

// BuildContext builds a vm.Scope based on the current set of cached values.
// The arguments and exports for the same ID are merged into one object.
func (vc *valueCache) BuildContext() *vm.Scope {
vc.mut.RLock()
defer vc.mut.RUnlock()

scope := vm.NewScope(make(map[string]interface{}))

// First, partition components by Alloy block name.
var componentsByBlockName = make(map[string][]ComponentID)
for _, id := range vc.components {
blockName := id[0]
componentsByBlockName[blockName] = append(componentsByBlockName[blockName], id)
}

// Then, convert each partition into a single value.
for blockName, ids := range componentsByBlockName {
scope.Variables[blockName] = vc.buildValue(ids, 1)
}

// Add module arguments to the scope.
if len(vc.moduleArguments) > 0 {
scope.Variables["argument"] = make(map[string]any)
}
for key, value := range vc.moduleArguments {
keyMap := make(map[string]any)
keyMap["value"] = value

switch args := scope.Variables["argument"].(type) {
case map[string]any:
args[key] = keyMap
argsMap := vc.scope.Variables["argument"].(map[string]any)
for arg := range argsMap {
if _, ok := args[arg]; !ok {
delete(argsMap, arg)
}
}

if vc.scope != nil {
// Merges the current scope with the one built from the components at this level.
scope.Variables = deepMergeMaps(vc.scope.Variables, scope.Variables)
if len(argsMap) == 0 {
delete(vc.scope.Variables, "argument")
}

return scope
}

// buildValue recursively converts the set of user components into a single
// value. offset is used to determine which element in the userComponentName
// we're looking at.
func (vc *valueCache) buildValue(from []ComponentID, offset int) interface{} {
// We can't recurse anymore; return the node directly.
if len(from) == 1 && offset >= len(from[0]) {
name := from[0].String()

// TODO(rfratto): should we allow arguments to be returned so users can
// reference arguments as well as exports?
exports, ok := vc.exports[name]
if !ok {
exports = make(map[string]interface{})
}
return exports
}

attrs := make(map[string]interface{})

// First, partition the components by their label.
var componentsByLabel = make(map[string][]ComponentID)
for _, id := range from {
blockName := id[offset]
componentsByLabel[blockName] = append(componentsByLabel[blockName], id)
}

// Then, convert each partition into a single value.
for label, ids := range componentsByLabel {
attrs[label] = vc.buildValue(ids, offset+1)
}
return attrs
// GetScope returns the current scope.
func (vc *valueCache) GetScope() *vm.Scope {
vc.mut.RLock()
defer vc.mut.RUnlock()
return vm.NewScope(deepCopyMap(vc.scope.Variables))
}

// deepMergeMaps merges two maps. It uses the values of map2 in case of conflict.
func deepMergeMaps(map1, map2 map[string]any) map[string]any {
merged := make(map[string]any, len(map1)+len(map2))

for key, value := range map1 {
merged[key] = value
}

for key, value := range map2 {
if existingValue, exists := merged[key]; exists {
if map1Value, ok1 := existingValue.(map[string]any); ok1 {
if map2Value, ok2 := value.(map[string]any); ok2 {
merged[key] = deepMergeMaps(map1Value, map2Value)
continue
}
}
func deepCopyMap(original map[string]any) map[string]any {
newMap := make(map[string]any, len(original))
for key, value := range original {
switch v := value.(type) {
case map[string]any:
newMap[key] = deepCopyMap(v)
default:
newMap[key] = v
}
merged[key] = value
}

return merged
return newMap
}
Loading

0 comments on commit 1a1f1a9

Please sign in to comment.