diff --git a/demo/.gitignore b/demo/.gitignore new file mode 100644 index 0000000000..3fec32c842 --- /dev/null +++ b/demo/.gitignore @@ -0,0 +1 @@ +tmp/ diff --git a/demo/consumer-cluster.alloy b/demo/consumer-cluster.alloy new file mode 100644 index 0000000000..c511d72573 --- /dev/null +++ b/demo/consumer-cluster.alloy @@ -0,0 +1,67 @@ + +logging { + // level = "debug" + level = "info" +} + +prometheus.scrape_task.receive.fake "a" { + forward_to = [ + prometheus.scrape_task.scrape.a.receiver, + ] + // This should match the producer.alloy + scrape_interval = "15s" + targets_count = 1000 +} + +prometheus.scrape_task.scrape "a" { + forward_to = [prometheus.scrape_task.send.a.receiver] + pool_size = 20 +} + +prometheus.scrape_task.send "a" { + +} + +// Traditional pipeline to publish metrics + +prometheus.exporter.self "self" {} + +discovery.relabel "replace_instance" { + targets = prometheus.exporter.self.self.targets + rule { + action = "replace" + source_labels = ["instance"] + target_label = "instance" + replacement = env("INSTANCE") + } + rule { + action = "replace" + source_labels = ["job"] + target_label = "job" + replacement = "integrations/alloy" + } +} + +prometheus.scrape "self" { + targets = discovery.relabel.replace_instance.output + forward_to = [prometheus.remote_write.cloud.receiver] + scrape_interval = "15s" +} + +prometheus.remote_write "cloud" { + endpoint { + url = "https://prometheus-prod-05-gb-south-0.grafana.net/api/prom/push" + + basic_auth { + username = "949385" + password_file = env("MIMIR_PWD_FILE") + } + } + + external_labels = { + "cluster" = "my-desk", + "namespace" = "alloy", + "alloy_instance" = env("INSTANCE"), + "role" = "cluster-consumer", + } +} diff --git a/test.alloy b/demo/consumer-redis.alloy similarity index 78% rename from test.alloy rename to demo/consumer-redis.alloy index 1c7fa41b34..e8271bc13f 100644 --- a/test.alloy +++ b/demo/consumer-redis.alloy @@ -4,16 +4,18 @@ logging { level = "info" } -prometheus.scrape_task.receive "a" { +prometheus.scrape_task.receive.redis "a" { forward_to = [ prometheus.scrape_task.scrape.a.receiver, ] - simulate_stable_assignment = true + concurrency = 1 + redis_address = "localhost:6379" } + prometheus.scrape_task.scrape "a" { forward_to = [prometheus.scrape_task.send.a.receiver] - pool_size = 10 + pool_size = 20 } prometheus.scrape_task.send "a" { @@ -30,13 +32,13 @@ discovery.relabel "replace_instance" { action = "replace" source_labels = ["instance"] target_label = "instance" - replacement = env("INSTANCE") + replacement = env("INSTANCE") } rule { action = "replace" source_labels = ["job"] target_label = "job" - replacement = "integrations/alloy" + replacement = "integrations/alloy" } } @@ -58,7 +60,8 @@ prometheus.remote_write "cloud" { external_labels = { "cluster" = "my-desk", - "namespace" = "alloy", - "alloy_instance" = env("INSTANCE"), + "namespace" = "alloy", + "alloy_instance" = env("INSTANCE"), + "role" = "redis-consumer", } } diff --git a/demo/producer.alloy b/demo/producer.alloy new file mode 100644 index 0000000000..6465e0a797 --- /dev/null +++ b/demo/producer.alloy @@ -0,0 +1,57 @@ + +logging { + // level = "debug" + level = "info" +} + +prometheus.scrape_task.produce.redis "a" { + redis_address = "localhost:6379" + // This should match the consumer-cluster.alloy + scrape_interval = "15s" + targets_count = 1000 +} + + +// Traditional pipeline to publish own metrics + +prometheus.exporter.self "self" {} + +discovery.relabel "replace_instance" { + targets = prometheus.exporter.self.self.targets + rule { + action = "replace" + source_labels = ["instance"] + target_label = "instance" + replacement = env("INSTANCE") + } + rule { + action = "replace" + source_labels = ["job"] + target_label = "job" + replacement = "integrations/alloy" + } +} + +prometheus.scrape "self" { + targets = discovery.relabel.replace_instance.output + forward_to = [prometheus.remote_write.cloud.receiver] + scrape_interval = "15s" +} + +prometheus.remote_write "cloud" { + endpoint { + url = "https://prometheus-prod-05-gb-south-0.grafana.net/api/prom/push" + + basic_auth { + username = "949385" + password_file = env("MIMIR_PWD_FILE") + } + } + + external_labels = { + "cluster" = "my-desk", + "namespace" = "alloy", + "alloy_instance" = env("INSTANCE"), + "role" = "producer", + } +} diff --git a/demo/reload-configs.sh b/demo/reload-configs.sh new file mode 100755 index 0000000000..f3c9e96a35 --- /dev/null +++ b/demo/reload-configs.sh @@ -0,0 +1,14 @@ +# producer +curl localhost:8000/-/reload + +# redis +curl localhost:8001/-/reload +curl localhost:8002/-/reload +curl localhost:8003/-/reload +curl localhost:8004/-/reload + +# cluster +curl localhost:8101/-/reload +curl localhost:8102/-/reload +curl localhost:8103/-/reload +curl localhost:8104/-/reload \ No newline at end of file diff --git a/demo/run-consumer-cluster.sh b/demo/run-consumer-cluster.sh new file mode 100755 index 0000000000..ea7dee8278 --- /dev/null +++ b/demo/run-consumer-cluster.sh @@ -0,0 +1,2 @@ +echo "running $1 on port $2" +INSTANCE=$1 MIMIR_PWD_FILE=~/workspace/go-playground/secrets/MIMIR_TOKEN alloy run --stability.level experimental consumer-cluster.alloy --cluster.name cluster --cluster.enabled --cluster.node-name "$1" --server.http.listen-addr "127.0.0.1:$2" --cluster.advertise-address "127.0.0.1:$2" --storage.path "./tmp/data_$1" --cluster.join-addresses "127.0.0.1:8101,127.0.0.1:8102,127.0.0.1:8103,127.0.0.1:8104" diff --git a/demo/run-consumer-redis.sh b/demo/run-consumer-redis.sh new file mode 100755 index 0000000000..dcc03eba5e --- /dev/null +++ b/demo/run-consumer-redis.sh @@ -0,0 +1,2 @@ +echo "running $1 on port $2" +INSTANCE=$1 MIMIR_PWD_FILE=~/workspace/go-playground/secrets/MIMIR_TOKEN alloy run --stability.level experimental consumer-redis.alloy --cluster.name redis --cluster.enabled --cluster.node-name "$1" --server.http.listen-addr "127.0.0.1:$2" --cluster.advertise-address "127.0.0.1:$2" --storage.path "./tmp/data_$1" --cluster.join-addresses "127.0.0.1:8001,127.0.0.1:8002,127.0.0.1:8003,127.0.0.1:8004" diff --git a/demo/run-producer.sh b/demo/run-producer.sh new file mode 100755 index 0000000000..7879f105d3 --- /dev/null +++ b/demo/run-producer.sh @@ -0,0 +1,3 @@ + +echo "running producer $1 on port $2" +INSTANCE=$1 MIMIR_PWD_FILE=~/workspace/go-playground/secrets/MIMIR_TOKEN alloy run --stability.level experimental producer.alloy --server.http.listen-addr "127.0.0.1:$2" --storage.path "./tmp/data_$1" diff --git a/demo/run_multiple.go b/demo/run_multiple.go new file mode 100644 index 0000000000..be0590dcfa --- /dev/null +++ b/demo/run_multiple.go @@ -0,0 +1,103 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "os/exec" + "os/signal" + "strconv" + "sync" + "syscall" +) + +func main() { + if len(os.Args) < 5 { + fmt.Println("Usage: go run main.go ") + os.Exit(1) + } + + scriptPath := os.Args[1] + namePrefix := os.Args[2] + portStart, err := strconv.Atoi(os.Args[3]) + if err != nil { + fmt.Println("Invalid port start") + os.Exit(1) + } + numInstances, err := strconv.Atoi(os.Args[4]) + if err != nil { + fmt.Println("Error converting num instances to integer") + os.Exit(1) + } + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + // Trap SIGINT (CTRL+C) and SIGTERM to gracefully shutdown + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) + + // Run the script N times in parallel + for i := 0; i < numInstances; i++ { + wg.Add(1) + go func(instanceID int) { + defer wg.Done() + runScript(ctx, scriptPath, namePrefix, portStart, instanceID) + }(i) + } + + // Wait for CTRL+C or termination signal + <-signalChan + fmt.Println("\nShutting down...") + cancel() // Signal all scripts to stop + + wg.Wait() // Wait for all goroutines to finish + fmt.Println("All scripts stopped.") + os.Exit(0) +} + +// runScript runs a given script in a separate goroutine and pipes its output with a prefix +func runScript(ctx context.Context, scriptPath string, namePrefix string, portStart int, instanceID int) { + cmd := exec.CommandContext(ctx, "bash", scriptPath, namePrefix+"-"+strconv.Itoa(instanceID), strconv.Itoa(portStart+instanceID)) + + // Get the stdout and stderr pipes + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + fmt.Printf("[Instance %d] Error creating stdout pipe: %v\n", instanceID, err) + return + } + + stderrPipe, err := cmd.StderrPipe() + if err != nil { + fmt.Printf("[Instance %d] Error creating stderr pipe: %v\n", instanceID, err) + return + } + + // Start the command + if err := cmd.Start(); err != nil { + fmt.Printf("[Instance %d] Failed to start script: %v\n", instanceID, err) + return + } + + // Log stdout and stderr with prefixes + go logOutput(fmt.Sprintf("[Instance %d] stdout: ", instanceID), stdoutPipe) + go logOutput(fmt.Sprintf("[Instance %d] stderr: ", instanceID), stderrPipe) + + // Wait for the command to complete + if err := cmd.Wait(); err != nil { + fmt.Printf("[Instance %d] Script exited with error: %v\n", instanceID, err) + } +} + +// logOutput reads from a pipe and logs each line with a given prefix +func logOutput(prefix string, pipe io.ReadCloser) { + scanner := bufio.NewScanner(pipe) + for scanner.Scan() { + fmt.Println(prefix + scanner.Text()) + } + if err := scanner.Err(); err != nil { + fmt.Printf("%s Error reading output: %v\n", prefix, err) + } +} diff --git a/go.mod b/go.mod index 9e5be6d237..e8faff8796 100644 --- a/go.mod +++ b/go.mod @@ -253,13 +253,13 @@ require ( golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 golang.org/x/net v0.31.0 golang.org/x/oauth2 v0.23.0 - golang.org/x/sys v0.27.0 + golang.org/x/sys v0.28.0 golang.org/x/text v0.20.0 - golang.org/x/time v0.6.0 + golang.org/x/time v0.8.0 golang.org/x/tools v0.25.0 google.golang.org/api v0.188.0 google.golang.org/grpc v1.67.1 - google.golang.org/protobuf v1.35.1 + google.golang.org/protobuf v1.35.2 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 gotest.tools v2.2.0+incompatible @@ -739,7 +739,7 @@ require ( github.com/sony/gobreaker v0.5.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect - github.com/spf13/cast v1.6.0 // indirect + github.com/spf13/cast v1.7.0 // indirect github.com/spf13/jwalterweatherman v1.0.0 // indirect github.com/spf13/viper v1.19.0 // indirect github.com/stormcat24/protodep v0.1.8 // indirect @@ -833,10 +833,13 @@ require ( github.com/antchfx/xpath v1.3.2 // indirect github.com/ebitengine/purego v0.8.0 // indirect github.com/elastic/lunes v0.1.0 // indirect + github.com/hibiken/asynq v0.25.0 // indirect github.com/mdlayher/vsock v1.2.1 // indirect github.com/moby/sys/userns v0.1.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.112.0 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + github.com/redis/go-redis/v9 v9.7.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect go.opentelemetry.io/collector/connector/connectorprofiles v0.112.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.112.0 // indirect go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles v0.112.0 // indirect diff --git a/go.sum b/go.sum index e56e898a66..512e4209a4 100644 --- a/go.sum +++ b/go.sum @@ -2093,6 +2093,8 @@ github.com/hetznercloud/hcloud-go/v2 v2.10.2 h1:9gyTUPhfNbfbS40Spgij5mV5k37bOZgt github.com/hetznercloud/hcloud-go/v2 v2.10.2/go.mod h1:xQ+8KhIS62W0D78Dpi57jsufWh844gUw1az5OUvaeq8= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/hibiken/asynq v0.25.0 h1:VCPyRRrrjFChsTSI8x5OCPu51MlEz6Rk+1p0kHKnZug= +github.com/hibiken/asynq v0.25.0/go.mod h1:DYQ1etBEl2Y+uSkqFElGYbk3M0ujLVwCfWE+TlvxtEk= github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E= github.com/hodgesds/perf-utils v0.7.0 h1:7KlHGMuig4FRH5fNw68PV6xLmgTe7jKs9hgAcEAbioU= github.com/hodgesds/perf-utils v0.7.0/go.mod h1:LAklqfDadNKpkxoAJNHpD5tkY0rkZEVdnCEWN5k4QJY= @@ -2912,6 +2914,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/relvacode/iso8601 v1.4.0 h1:GsInVSEJfkYuirYFxa80nMLbH2aydgZpIf52gYZXUJs= github.com/relvacode/iso8601 v1.4.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I= github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E= @@ -2927,6 +2931,9 @@ github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052/go.mod h1 github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -3053,6 +3060,8 @@ github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkU github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= +github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= @@ -3883,6 +3892,8 @@ golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -3939,6 +3950,8 @@ golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -4357,6 +4370,8 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 1e2595bfcb..2dfe4629fd 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -140,7 +140,9 @@ import ( _ "github.com/grafana/alloy/internal/component/prometheus/relabel" // Import prometheus.relabel _ "github.com/grafana/alloy/internal/component/prometheus/remotewrite" // Import prometheus.remote_write _ "github.com/grafana/alloy/internal/component/prometheus/scrape" // Import prometheus.scrape - _ "github.com/grafana/alloy/internal/component/prometheus/scrape_task/receive" // Import prometheus.scrape_task.receive + _ "github.com/grafana/alloy/internal/component/prometheus/scrape_task/produce/redis" // Import prometheus.scrape_task.produce.redis + _ "github.com/grafana/alloy/internal/component/prometheus/scrape_task/receive/fake" // Import prometheus.scrape_task.receive.fake + _ "github.com/grafana/alloy/internal/component/prometheus/scrape_task/receive/redis" // Import prometheus.scrape_task.receive.redis _ "github.com/grafana/alloy/internal/component/prometheus/scrape_task/scrape" // Import prometheus.scrape_task.scrape _ "github.com/grafana/alloy/internal/component/prometheus/scrape_task/send" // Import prometheus.scrape_task.send _ "github.com/grafana/alloy/internal/component/prometheus/write/queue" // Import prometheus.write.queue diff --git a/internal/component/prometheus/scrape_task/internal/faketasks/provider.go b/internal/component/prometheus/scrape_task/internal/faketasks/provider.go new file mode 100644 index 0000000000..9d55e9414a --- /dev/null +++ b/internal/component/prometheus/scrape_task/internal/faketasks/provider.go @@ -0,0 +1,62 @@ +package faketasks + +import ( + "sync" + "time" + + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/component/prometheus/scrape_task" + "github.com/grafana/alloy/internal/component/prometheus/scrape_task/internal/promstub" + "github.com/grafana/alloy/internal/component/prometheus/scrape_task/internal/random" +) + +type provider struct { + scrapeInterval time.Duration + tasksCount int + ticker *time.Ticker + lagNotificationHandler func(duration time.Duration) + + mut sync.Mutex + cached []scrape_task.ScrapeTask + lastPop time.Time +} + +func NewProvider(scrapeInterval time.Duration, tasksCount int, lagHandler func(duration time.Duration)) scrape_task.ScrapeTaskProvider { + if lagHandler == nil { + lagHandler = func(duration time.Duration) {} + } + return &provider{ + scrapeInterval: scrapeInterval, + tasksCount: tasksCount, + ticker: time.NewTicker(scrapeInterval), + lagNotificationHandler: lagHandler, + lastPop: time.Now().Add(2 * scrapeInterval), + } +} + +func (p *provider) Get() []scrape_task.ScrapeTask { + <-p.ticker.C // this limits the rate of scrape tasks produced, simulating scrape interval + p.mut.Lock() + defer p.mut.Unlock() + p.lagNotificationHandler(time.Since(p.lastPop)) + p.lastPop = time.Now() + if p.cached == nil { + p.cached = fakeScrapeTasks(p.tasksCount) + } + return p.cached +} + +func fakeScrapeTasks(count int) []scrape_task.ScrapeTask { + result := make([]scrape_task.ScrapeTask, count) + for i := 0; i < count; i++ { + numberOfSeries := random.NumberOfSeries(1_000, 100_000, 1_000) + result[i] = scrape_task.ScrapeTask{ + Target: discovery.Target{ + "host": "host_" + random.String(6), + "team": "team_" + random.String(1), + promstub.SeriesToGenerateLabel: numberOfSeries, + }, + } + } + return result +} diff --git a/internal/component/prometheus/scrape_task/internal/promstub/scraper.go b/internal/component/prometheus/scrape_task/internal/promstub/scraper.go index 96e4f3d0d6..123953ecb4 100644 --- a/internal/component/prometheus/scrape_task/internal/promstub/scraper.go +++ b/internal/component/prometheus/scrape_task/internal/promstub/scraper.go @@ -56,8 +56,8 @@ func (s scraper) ScrapeTarget(target discovery.Target) (promadapter.Metrics, err // Each series adds latency, so the more series, the more latency. random.SimulateLatency( - time.Nanosecond*10, // min - time.Nanosecond*100, // avg + time.Nanosecond*1, // min + time.Nanosecond*10, // avg time.Microsecond*100, // max time.Nanosecond*500, // stdev ) diff --git a/internal/component/prometheus/scrape_task/internal/promstub/sender.go b/internal/component/prometheus/scrape_task/internal/promstub/sender.go index 99a1e8b970..b25eb523cb 100644 --- a/internal/component/prometheus/scrape_task/internal/promstub/sender.go +++ b/internal/component/prometheus/scrape_task/internal/promstub/sender.go @@ -25,8 +25,8 @@ func (s *sender) Send(metrics []promadapter.Metrics) error { } // Each series adds latency, so the more series, the more latency. random.SimulateLatency( - time.Nanosecond*10, // min - time.Nanosecond*100, // avg + time.Nanosecond*1, // min + time.Nanosecond*10, // avg time.Microsecond*100, // max time.Nanosecond*500, // stdev ) diff --git a/internal/component/prometheus/scrape_task/internal/queuestub/queue.go b/internal/component/prometheus/scrape_task/internal/queuestub/queue.go deleted file mode 100644 index 6cc0a2c325..0000000000 --- a/internal/component/prometheus/scrape_task/internal/queuestub/queue.go +++ /dev/null @@ -1,59 +0,0 @@ -package queuestub - -import ( - "fmt" - "sync" - "time" - - "github.com/grafana/alloy/internal/component/discovery" - "github.com/grafana/alloy/internal/component/prometheus/scrape_task" - "github.com/grafana/alloy/internal/component/prometheus/scrape_task/internal/promstub" - "github.com/grafana/alloy/internal/component/prometheus/scrape_task/internal/random" -) - -// The scrape interval we simulate here - how often scrape tasks are available in the queue. -const scrapeInterval = 3 * time.Second - -// We only generate tasks once and keep them cached. To simulate same targets having same numbers of series. -var ( - cached []scrape_task.ScrapeTask = nil - mut sync.Mutex - ticker = time.NewTicker(scrapeInterval) - lastPop = time.Now().Add(scrapeInterval * 2) -) - -func PopTasks(count int) []scrape_task.ScrapeTask { - <-ticker.C // this limits the rate of scrape tasks produced, simulating scrape interval - - // TODO(thampiotr): Instead of this, in a real system we would monitor the queue depth. Kinda like WAL delay - a - // sign of congestion. - if time.Since(lastPop) > scrapeInterval*2 { - fmt.Println("=======> QUEUE IS NOT DRAINED FAST ENOUGH") - } - lastPop = time.Now() - - return fakeScrapeTasks(count) -} - -func fakeScrapeTasks(count int) []scrape_task.ScrapeTask { - mut.Lock() - defer mut.Unlock() - if cached != nil { - return cached - } - - result := make([]scrape_task.ScrapeTask, count) - for i := 0; i < count; i++ { - numberOfSeries := random.NumberOfSeries(5_000, 100_000, 1_000) - result[i] = scrape_task.ScrapeTask{ - Target: discovery.Target{ - "host": "host_" + random.String(6), - "team": "team_" + random.String(1), - promstub.SeriesToGenerateLabel: numberOfSeries, - }, - } - } - - cached = result - return result -} diff --git a/internal/component/prometheus/scrape_task/internal/random/random.go b/internal/component/prometheus/scrape_task/internal/random/random.go index 1090c8546d..002da51a0f 100644 --- a/internal/component/prometheus/scrape_task/internal/random/random.go +++ b/internal/component/prometheus/scrape_task/internal/random/random.go @@ -17,7 +17,7 @@ func NumberOfSeries(smallAvg int, bigAvg int, stdev int) string { mut.Lock() defer mut.Unlock() var n int - if r.Intn(20) == 0 { // 5% will be big + if r.Intn(50) == 0 { // 2% will be big n = int(r.NormFloat64()*float64(stdev) + float64(bigAvg)) } else { // 95% will be smaller n = int(r.NormFloat64()*float64(stdev) + float64(smallAvg)) diff --git a/internal/component/prometheus/scrape_task/produce/redis/redis.go b/internal/component/prometheus/scrape_task/produce/redis/redis.go new file mode 100644 index 0000000000..2999c16fab --- /dev/null +++ b/internal/component/prometheus/scrape_task/produce/redis/redis.go @@ -0,0 +1,151 @@ +package redis + +import ( + "context" + "encoding/json" + "slices" + "sync" + "time" + + "github.com/hibiken/asynq" + promclient "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus/scrape_task" + "github.com/grafana/alloy/internal/component/prometheus/scrape_task/internal/faketasks" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +func init() { + component.Register(component.Registration{ + Name: "prometheus.scrape_task.produce.redis", + Stability: featuregate.StabilityExperimental, + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +type Arguments struct { + TargetsCount int `alloy:"targets_count,attr,optional"` + ScrapeInterval time.Duration `alloy:"scrape_interval,attr,optional"` + RedisAddress string `alloy:"redis_address,attr,optional"` + BatchSize int `alloy:"batch_size,attr,optional"` +} + +func (a *Arguments) SetToDefault() { + a.TargetsCount = 100 + a.BatchSize = 10 + a.ScrapeInterval = time.Minute + a.RedisAddress = "localhost:6379" +} + +type Component struct { + opts component.Options + fakeTaskProvider scrape_task.ScrapeTaskProvider + taskLag promclient.Histogram + + mut sync.RWMutex + args Arguments +} + +func New(opts component.Options, args Arguments) (*Component, error) { + taskLag := promclient.NewHistogram(promclient.HistogramOpts{ + Name: "scrape_tasks_redis_tasks_write_lag_seconds", + Help: "The time a task spends waiting to be written to the queue", + Buckets: []float64{0.5, 1, 5, 10, 30, 60, 90, 180, 300, 600}, + }) + err := opts.Registerer.Register(taskLag) + if err != nil { + return nil, err + } + + c := &Component{ + opts: opts, + taskLag: taskLag, + } + + if err := c.Update(args); err != nil { + return nil, err + } + return c, nil +} + +// Run satisfies the Component interface. +func (c *Component) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + level.Info(c.opts.Logger).Log("msg", "terminating due to context done") + return nil + default: + c.mut.RLock() + taskProvider := c.fakeTaskProvider + redisAddr := c.args.RedisAddress + batchSize := c.args.BatchSize + scrapeInterval := c.args.ScrapeInterval + c.mut.RUnlock() + + // This will block for scrape interval until tasks are available. + tasks := taskProvider.Get() + + // TODO(thampiotr): we should ideally keep the same client around until address changes, but for this demo + // I'm avoiding the complexity of managing client updates. + client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) + + // Send in batches + for batch := range slices.Chunk(tasks, batchSize) { + payload, err := newScrapeTasksPayload(batch) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "error creating scrape tasks payload", "err", err) + } + + info, err := client.Enqueue( + payload, + asynq.MaxRetry(3), + asynq.Queue("scrape_tasks"), + asynq.Timeout(scrapeInterval), + asynq.Unique(scrapeInterval*2), + ) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "error enqueue scrape tasks", "err", err, "info", info) + } else { + level.Debug(c.opts.Logger).Log("msg", "scrape tasks enqueued", "count", len(batch), "id", info.ID, "queue", info.Queue) + } + } + + if err := client.Close(); err != nil { + level.Warn(c.opts.Logger).Log("msg", "failed to close redis client", "err", err) + } + } + } +} + +// Update satisfies the Component interface. +func (c *Component) Update(args component.Arguments) error { + c.mut.Lock() + defer c.mut.Unlock() + + c.args = args.(Arguments) + + c.fakeTaskProvider = faketasks.NewProvider( + c.args.ScrapeInterval, + c.args.TargetsCount, + func(duration time.Duration) { + c.taskLag.Observe(duration.Seconds()) + }, + ) + + return nil +} + +func newScrapeTasksPayload(tasks []scrape_task.ScrapeTask) (*asynq.Task, error) { + payload, err := json.Marshal(tasks) + if err != nil { + return nil, err + } + return asynq.NewTask(scrape_task.TaskTypeScrapeTaskV1, payload), nil +} diff --git a/internal/component/prometheus/scrape_task/receive/receive.go b/internal/component/prometheus/scrape_task/receive/fake/fake.go similarity index 63% rename from internal/component/prometheus/scrape_task/receive/receive.go rename to internal/component/prometheus/scrape_task/receive/fake/fake.go index edacde595d..9b8a200108 100644 --- a/internal/component/prometheus/scrape_task/receive/receive.go +++ b/internal/component/prometheus/scrape_task/receive/fake/fake.go @@ -4,13 +4,14 @@ import ( "context" "fmt" "sync" + "time" "github.com/grafana/ckit/shard" promclient "github.com/prometheus/client_golang/prometheus" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/prometheus/scrape_task" - "github.com/grafana/alloy/internal/component/prometheus/scrape_task/internal/queuestub" + "github.com/grafana/alloy/internal/component/prometheus/scrape_task/internal/faketasks" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/service/cluster" @@ -18,7 +19,7 @@ import ( func init() { component.Register(component.Registration{ - Name: "prometheus.scrape_task.receive", + Name: "prometheus.scrape_task.receive.fake", Stability: featuregate.StabilityExperimental, Args: Arguments{}, @@ -29,19 +30,22 @@ func init() { } type Arguments struct { - ForwardTo []scrape_task.ScrapeTaskConsumer `alloy:"forward_to,attr"` - BatchSize int `alloy:"batch_size,attr,optional"` - SimulateStableAssignment bool `alloy:"simulate_stable_assignment,attr,optional"` + ForwardTo []scrape_task.ScrapeTaskConsumer `alloy:"forward_to,attr"` + TargetsCount int `alloy:"targets_count,attr,optional"` + ScrapeInterval time.Duration `alloy:"scrape_interval,attr,optional"` } func (a *Arguments) SetToDefault() { - a.BatchSize = 100 + a.TargetsCount = 100 + a.ScrapeInterval = time.Minute } type Component struct { - opts component.Options - tasksCounter promclient.Counter - cluster cluster.Cluster + opts component.Options + tasksCounter promclient.Counter + taskLag promclient.Histogram + cluster cluster.Cluster + fakeTaskProvider scrape_task.ScrapeTaskProvider mut sync.RWMutex args Arguments @@ -49,13 +53,23 @@ type Component struct { func New(opts component.Options, args Arguments) (*Component, error) { tasksCounter := promclient.NewCounter(promclient.CounterOpts{ - Name: "scrape_tasks_tasks_processed_total", - Help: "Number of tasks the prometheus.scrape_task.receiver component has processed"}) + Name: "scrape_tasks_fake_tasks_processed_total", + Help: "Number of tasks the prometheus.scrape_task.receiver.fake component has processed"}) err := opts.Registerer.Register(tasksCounter) if err != nil { return nil, err } + taskLag := promclient.NewHistogram(promclient.HistogramOpts{ + Name: "scrape_tasks_fake_tasks_lag_seconds", + Help: "The time a task spends waiting to be picked up", + Buckets: []float64{0.5, 1, 5, 10, 30, 60, 90, 180, 300, 600}, + }) + err = opts.Registerer.Register(taskLag) + if err != nil { + return nil, err + } + clusterSvc, err := opts.GetServiceData(cluster.ServiceName) if err != nil { return nil, fmt.Errorf("failed to get information about cluster: %w", err) @@ -63,6 +77,7 @@ func New(opts component.Options, args Arguments) (*Component, error) { c := &Component{ opts: opts, tasksCounter: tasksCounter, + taskLag: taskLag, cluster: clusterSvc.(cluster.Cluster), } @@ -81,24 +96,17 @@ func (c *Component) Run(ctx context.Context) error { return nil default: c.mut.RLock() - batchSize := c.args.BatchSize - stableAssignment := c.args.SimulateStableAssignment + consumers := c.args.ForwardTo + taskProvider := c.fakeTaskProvider c.mut.RUnlock() - tasks := queuestub.PopTasks(batchSize) - if stableAssignment { - // If we want to simulate stable assignment, we will only process tasks that belong to this instance - // as determined by consistent hashing - this is similar to current clustering where targets are - // distributed between instances. - tasks = c.filterTasksWeOwn(tasks) - } + // This will block for scrape interval until tasks are available. + tasks := taskProvider.Get() + // Like our current stable cluster sharding - pick only targets that belong to local instance. + tasks = c.filterTasksWeOwn(tasks) + // Fan out the tasks level.Debug(c.opts.Logger).Log("msg", "forwarding scrape tasks to consumers", "count", len(tasks)) - - c.mut.RLock() - consumers := c.args.ForwardTo - c.mut.RUnlock() - for _, consumer := range consumers { consumer.Consume(tasks) } @@ -110,7 +118,7 @@ func (c *Component) Run(ctx context.Context) error { func (c *Component) filterTasksWeOwn(tasks []scrape_task.ScrapeTask) []scrape_task.ScrapeTask { var newTasks []scrape_task.ScrapeTask for _, task := range tasks { - // Extract host label from target + // Extract host label from target. This is a hack for demo purposes. sl := task.Target.SpecificLabels([]string{"host"}) if len(sl) != 1 { fmt.Println("missing host label") @@ -134,5 +142,14 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() c.args = args.(Arguments) + + c.fakeTaskProvider = faketasks.NewProvider( + c.args.ScrapeInterval, + c.args.TargetsCount, + func(duration time.Duration) { + c.taskLag.Observe(duration.Seconds()) + }, + ) + return nil } diff --git a/internal/component/prometheus/scrape_task/receive/redis/redis.go b/internal/component/prometheus/scrape_task/receive/redis/redis.go new file mode 100644 index 0000000000..0a07d55c04 --- /dev/null +++ b/internal/component/prometheus/scrape_task/receive/redis/redis.go @@ -0,0 +1,173 @@ +package receive + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/hibiken/asynq" + promclient "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus/scrape_task" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +func init() { + component.Register(component.Registration{ + Name: "prometheus.scrape_task.receive.redis", + Stability: featuregate.StabilityExperimental, + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +type Arguments struct { + ForwardTo []scrape_task.ScrapeTaskConsumer `alloy:"forward_to,attr"` + Concurrency int `alloy:"concurrency,attr,optional"` + RedisAddress string `alloy:"redis_address,attr,optional"` +} + +func (a *Arguments) SetToDefault() { + a.Concurrency = 5 + a.RedisAddress = "localhost:6379" +} + +type Component struct { + opts component.Options + tasksCounter promclient.Counter + server *asynq.Server + serverUpdated chan struct{} + + mut sync.RWMutex + args Arguments +} + +func New(opts component.Options, args Arguments) (*Component, error) { + tasksCounter := promclient.NewCounter(promclient.CounterOpts{ + Name: "scrape_tasks_redis_tasks_processed_total", + Help: "Number of tasks the prometheus.scrape_task.receiver.redis component has processed"}) + err := opts.Registerer.Register(tasksCounter) + if err != nil { + return nil, err + } + + c := &Component{ + opts: opts, + tasksCounter: tasksCounter, + serverUpdated: make(chan struct{}, 1), + } + + if err := c.Update(args); err != nil { + return nil, err + } + return c, nil +} + +// Run satisfies the Component interface. +func (c *Component) Run(ctx context.Context) error { + defer func() { + c.mut.RLock() + defer c.mut.RUnlock() + if c.server != nil { + c.server.Shutdown() + } + }() + + oneServerAtATime := sync.Mutex{} + runServer := func() { + c.mut.RLock() + server := c.server + c.mut.RUnlock() + + // Only allow one server running at a time + oneServerAtATime.Lock() + defer oneServerAtATime.Unlock() + if err := server.Run(c.serverMux()); err != nil { + level.Warn(c.opts.Logger).Log("msg", "server exit with error", "err", err) + } + } + + for { + select { + case <-ctx.Done(): + level.Info(c.opts.Logger).Log("msg", "terminating due to context done") + return nil + case <-c.serverUpdated: + level.Info(c.opts.Logger).Log("msg", "starting new server after update") + go runServer() + } + } +} + +// Update satisfies the Component interface. +func (c *Component) Update(args component.Arguments) error { + c.mut.Lock() + defer c.mut.Unlock() + newArgs := args.(Arguments) + + if c.serverNeedsUpdate(newArgs) { + if c.server != nil { + c.server.Shutdown() + } + + srv := asynq.NewServer( + asynq.RedisClientOpt{Addr: c.args.RedisAddress}, + asynq.Config{ + // Specify how many concurrent workers to use + Concurrency: c.args.Concurrency, + // Optionally specify multiple queues with different priority. + Queues: map[string]int{ + "scrape_tasks": 10, + }, + }, + ) + c.server = srv + + // Signal server is updated. + select { + case c.serverUpdated <- struct{}{}: + default: + } + } + + c.args = newArgs + + return nil +} + +func (c *Component) serverMux() *asynq.ServeMux { + mux := asynq.NewServeMux() + mux.Handle(scrape_task.TaskTypeScrapeTaskV1, c) + return mux +} + +func (c *Component) ProcessTask(ctx context.Context, t *asynq.Task) error { + var tasks []scrape_task.ScrapeTask + if err := json.Unmarshal(t.Payload(), &tasks); err != nil { + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + + level.Info(c.opts.Logger).Log("msg", "=======> received scrape tasks", "count", len(tasks)) + + c.mut.RLock() + consumers := c.args.ForwardTo + c.mut.RUnlock() + + // TODO(thampiotr): should handle ctx.Done() here! + for _, consumer := range consumers { + consumer.Consume(tasks) + } + c.tasksCounter.Add(float64(len(tasks))) + + return nil +} + +func (c *Component) serverNeedsUpdate(newArgs Arguments) bool { + return c.server == nil || c.args.RedisAddress != newArgs.RedisAddress || c.args.Concurrency != newArgs.Concurrency +} diff --git a/internal/component/prometheus/scrape_task/scrape_task.go b/internal/component/prometheus/scrape_task/scrape_task.go index 57ede7af4a..0389c0dc82 100644 --- a/internal/component/prometheus/scrape_task/scrape_task.go +++ b/internal/component/prometheus/scrape_task/scrape_task.go @@ -5,10 +5,18 @@ import ( "github.com/grafana/alloy/internal/component/prometheus/scrape_task/internal/promadapter" ) +const ( + TaskTypeScrapeTaskV1 = "alloy:scrape_task:v1" +) + type ScrapeTask struct { Target discovery.Target } +type ScrapeTaskProvider interface { + Get() []ScrapeTask +} + type ScrapeTaskConsumer interface { Consume(tasks []ScrapeTask) }