Skip to content

Commit

Permalink
add demo and final tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Dec 6, 2024
1 parent ee520c1 commit 2b2a96b
Show file tree
Hide file tree
Showing 21 changed files with 726 additions and 102 deletions.
1 change: 1 addition & 0 deletions demo/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tmp/
67 changes: 67 additions & 0 deletions demo/consumer-cluster.alloy
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

logging {
// level = "debug"
level = "info"
}

prometheus.scrape_task.receive.fake "a" {
forward_to = [
prometheus.scrape_task.scrape.a.receiver,
]
// This should match the producer.alloy
scrape_interval = "15s"
targets_count = 1000
}

prometheus.scrape_task.scrape "a" {
forward_to = [prometheus.scrape_task.send.a.receiver]
pool_size = 20
}

prometheus.scrape_task.send "a" {

}

// Traditional pipeline to publish metrics

prometheus.exporter.self "self" {}

discovery.relabel "replace_instance" {
targets = prometheus.exporter.self.self.targets
rule {
action = "replace"
source_labels = ["instance"]
target_label = "instance"
replacement = env("INSTANCE")
}
rule {
action = "replace"
source_labels = ["job"]
target_label = "job"
replacement = "integrations/alloy"
}
}

prometheus.scrape "self" {
targets = discovery.relabel.replace_instance.output
forward_to = [prometheus.remote_write.cloud.receiver]
scrape_interval = "15s"
}

prometheus.remote_write "cloud" {
endpoint {
url = "https://prometheus-prod-05-gb-south-0.grafana.net/api/prom/push"

basic_auth {
username = "949385"
password_file = env("MIMIR_PWD_FILE")
}
}

external_labels = {
"cluster" = "my-desk",
"namespace" = "alloy",
"alloy_instance" = env("INSTANCE"),
"role" = "cluster-consumer",
}
}
17 changes: 10 additions & 7 deletions test.alloy → demo/consumer-redis.alloy
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ logging {
level = "info"
}

prometheus.scrape_task.receive "a" {
prometheus.scrape_task.receive.redis "a" {
forward_to = [
prometheus.scrape_task.scrape.a.receiver,
]
simulate_stable_assignment = true
concurrency = 1
redis_address = "localhost:6379"
}


prometheus.scrape_task.scrape "a" {
forward_to = [prometheus.scrape_task.send.a.receiver]
pool_size = 10
pool_size = 20
}

prometheus.scrape_task.send "a" {
Expand All @@ -30,13 +32,13 @@ discovery.relabel "replace_instance" {
action = "replace"
source_labels = ["instance"]
target_label = "instance"
replacement = env("INSTANCE")
replacement = env("INSTANCE")
}
rule {
action = "replace"
source_labels = ["job"]
target_label = "job"
replacement = "integrations/alloy"
replacement = "integrations/alloy"
}
}

Expand All @@ -58,7 +60,8 @@ prometheus.remote_write "cloud" {

external_labels = {
"cluster" = "my-desk",
"namespace" = "alloy",
"alloy_instance" = env("INSTANCE"),
"namespace" = "alloy",
"alloy_instance" = env("INSTANCE"),
"role" = "redis-consumer",
}
}
57 changes: 57 additions & 0 deletions demo/producer.alloy
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@

logging {
// level = "debug"
level = "info"
}

prometheus.scrape_task.produce.redis "a" {
redis_address = "localhost:6379"
// This should match the consumer-cluster.alloy
scrape_interval = "15s"
targets_count = 1000
}


// Traditional pipeline to publish own metrics

prometheus.exporter.self "self" {}

discovery.relabel "replace_instance" {
targets = prometheus.exporter.self.self.targets
rule {
action = "replace"
source_labels = ["instance"]
target_label = "instance"
replacement = env("INSTANCE")
}
rule {
action = "replace"
source_labels = ["job"]
target_label = "job"
replacement = "integrations/alloy"
}
}

prometheus.scrape "self" {
targets = discovery.relabel.replace_instance.output
forward_to = [prometheus.remote_write.cloud.receiver]
scrape_interval = "15s"
}

prometheus.remote_write "cloud" {
endpoint {
url = "https://prometheus-prod-05-gb-south-0.grafana.net/api/prom/push"

basic_auth {
username = "949385"
password_file = env("MIMIR_PWD_FILE")
}
}

external_labels = {
"cluster" = "my-desk",
"namespace" = "alloy",
"alloy_instance" = env("INSTANCE"),
"role" = "producer",
}
}
14 changes: 14 additions & 0 deletions demo/reload-configs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# producer
curl localhost:8000/-/reload

# redis
curl localhost:8001/-/reload
curl localhost:8002/-/reload
curl localhost:8003/-/reload
curl localhost:8004/-/reload

# cluster
curl localhost:8101/-/reload
curl localhost:8102/-/reload
curl localhost:8103/-/reload
curl localhost:8104/-/reload
2 changes: 2 additions & 0 deletions demo/run-consumer-cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
echo "running $1 on port $2"
INSTANCE=$1 MIMIR_PWD_FILE=~/workspace/go-playground/secrets/MIMIR_TOKEN alloy run --stability.level experimental consumer-cluster.alloy --cluster.name cluster --cluster.enabled --cluster.node-name "$1" --server.http.listen-addr "127.0.0.1:$2" --cluster.advertise-address "127.0.0.1:$2" --storage.path "./tmp/data_$1" --cluster.join-addresses "127.0.0.1:8101,127.0.0.1:8102,127.0.0.1:8103,127.0.0.1:8104"
2 changes: 2 additions & 0 deletions demo/run-consumer-redis.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
echo "running $1 on port $2"
INSTANCE=$1 MIMIR_PWD_FILE=~/workspace/go-playground/secrets/MIMIR_TOKEN alloy run --stability.level experimental consumer-redis.alloy --cluster.name redis --cluster.enabled --cluster.node-name "$1" --server.http.listen-addr "127.0.0.1:$2" --cluster.advertise-address "127.0.0.1:$2" --storage.path "./tmp/data_$1" --cluster.join-addresses "127.0.0.1:8001,127.0.0.1:8002,127.0.0.1:8003,127.0.0.1:8004"
3 changes: 3 additions & 0 deletions demo/run-producer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

echo "running producer $1 on port $2"
INSTANCE=$1 MIMIR_PWD_FILE=~/workspace/go-playground/secrets/MIMIR_TOKEN alloy run --stability.level experimental producer.alloy --server.http.listen-addr "127.0.0.1:$2" --storage.path "./tmp/data_$1"
103 changes: 103 additions & 0 deletions demo/run_multiple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package main

import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"strconv"
"sync"
"syscall"
)

func main() {
if len(os.Args) < 5 {
fmt.Println("Usage: go run main.go <script-path> <name-prefix> <port-start> <N>")
os.Exit(1)
}

scriptPath := os.Args[1]
namePrefix := os.Args[2]
portStart, err := strconv.Atoi(os.Args[3])
if err != nil {
fmt.Println("Invalid port start")
os.Exit(1)
}
numInstances, err := strconv.Atoi(os.Args[4])
if err != nil {
fmt.Println("Error converting num instances to integer")
os.Exit(1)
}

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup

// Trap SIGINT (CTRL+C) and SIGTERM to gracefully shutdown
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

// Run the script N times in parallel
for i := 0; i < numInstances; i++ {
wg.Add(1)
go func(instanceID int) {
defer wg.Done()
runScript(ctx, scriptPath, namePrefix, portStart, instanceID)
}(i)
}

// Wait for CTRL+C or termination signal
<-signalChan
fmt.Println("\nShutting down...")
cancel() // Signal all scripts to stop

wg.Wait() // Wait for all goroutines to finish
fmt.Println("All scripts stopped.")
os.Exit(0)
}

// runScript runs a given script in a separate goroutine and pipes its output with a prefix
func runScript(ctx context.Context, scriptPath string, namePrefix string, portStart int, instanceID int) {
cmd := exec.CommandContext(ctx, "bash", scriptPath, namePrefix+"-"+strconv.Itoa(instanceID), strconv.Itoa(portStart+instanceID))

// Get the stdout and stderr pipes
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
fmt.Printf("[Instance %d] Error creating stdout pipe: %v\n", instanceID, err)
return
}

stderrPipe, err := cmd.StderrPipe()
if err != nil {
fmt.Printf("[Instance %d] Error creating stderr pipe: %v\n", instanceID, err)
return
}

// Start the command
if err := cmd.Start(); err != nil {
fmt.Printf("[Instance %d] Failed to start script: %v\n", instanceID, err)
return
}

// Log stdout and stderr with prefixes
go logOutput(fmt.Sprintf("[Instance %d] stdout: ", instanceID), stdoutPipe)
go logOutput(fmt.Sprintf("[Instance %d] stderr: ", instanceID), stderrPipe)

// Wait for the command to complete
if err := cmd.Wait(); err != nil {
fmt.Printf("[Instance %d] Script exited with error: %v\n", instanceID, err)
}
}

// logOutput reads from a pipe and logs each line with a given prefix
func logOutput(prefix string, pipe io.ReadCloser) {
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
fmt.Println(prefix + scanner.Text())
}
if err := scanner.Err(); err != nil {
fmt.Printf("%s Error reading output: %v\n", prefix, err)
}
}
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ require (
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/net v0.31.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sys v0.27.0
golang.org/x/sys v0.28.0
golang.org/x/text v0.20.0
golang.org/x/time v0.6.0
golang.org/x/time v0.8.0
golang.org/x/tools v0.25.0
google.golang.org/api v0.188.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
google.golang.org/protobuf v1.35.2
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
gotest.tools v2.2.0+incompatible
Expand Down Expand Up @@ -739,7 +739,7 @@ require (
github.com/sony/gobreaker v0.5.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/spf13/viper v1.19.0 // indirect
github.com/stormcat24/protodep v0.1.8 // indirect
Expand Down Expand Up @@ -833,10 +833,13 @@ require (
github.com/antchfx/xpath v1.3.2 // indirect
github.com/ebitengine/purego v0.8.0 // indirect
github.com/elastic/lunes v0.1.0 // indirect
github.com/hibiken/asynq v0.25.0 // indirect
github.com/mdlayher/vsock v1.2.1 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.112.0 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/redis/go-redis/v9 v9.7.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
go.opentelemetry.io/collector/connector/connectorprofiles v0.112.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.112.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles v0.112.0 // indirect
Expand Down
Loading

0 comments on commit 2b2a96b

Please sign in to comment.