Skip to content

Commit

Permalink
fix test; add prometheus metrics for hubs (dis)connected count
Browse files Browse the repository at this point in the history
  • Loading branch information
IngoRoessner committed Jul 30, 2024
1 parent f77de92 commit 26efe53
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 85 deletions.
96 changes: 71 additions & 25 deletions pkg/pkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"log"
"runtime/debug"
"sync"
"time"
)

func Start(ctx context.Context, wg *sync.WaitGroup, config configuration.Config) error {
Expand Down Expand Up @@ -94,30 +95,75 @@ func getOnMetricsServeRequestHandler(tokengen *auth.Security, perm client.Client
debug.PrintStack()
return
}
connected, err := perm.Total(token, "devices", client.ListOptions{
Selection: &client.FeatureSelection{
Feature: "annotations.connected",
Value: "true",
},
})
if err != nil {
log.Println("ERROR:", err)
debug.PrintStack()
return
}
metrics.TotalConnected.Set(float64(connected))

disconnected, err := perm.Total(token, "devices", client.ListOptions{
Selection: &client.FeatureSelection{
Feature: "annotations.connected",
Value: "false",
},
})
if err != nil {
log.Println("ERROR:", err)
debug.PrintStack()
return
}
metrics.TotalDisconnected.Set(float64(disconnected))
wg := sync.WaitGroup{}

start := time.Now()

wg.Add(1)
go func() {
defer wg.Done()
connected, err := perm.Total(token, "devices", client.ListOptions{
Selection: &client.FeatureSelection{
Feature: "annotations.connected",
Value: "true",
},
})
if err != nil {
log.Println("ERROR: unable to load total connected device count from permission-search;", err)
return
}
metrics.TotalConnected.Set(float64(connected))
}()

wg.Add(1)
go func() {
defer wg.Done()
disconnected, err := perm.Total(token, "devices", client.ListOptions{
Selection: &client.FeatureSelection{
Feature: "annotations.connected",
Value: "false",
},
})
if err != nil {
log.Println("ERROR: unable to load total disconnected device count from permission-search;", err)
return
}
metrics.TotalDisconnected.Set(float64(disconnected))
}()

wg.Add(1)
go func() {
defer wg.Done()
connected, err := perm.Total(token, "hubs", client.ListOptions{
Selection: &client.FeatureSelection{
Feature: "annotations.connected",
Value: "true",
},
})
if err != nil {
log.Println("ERROR: unable to load total connected hub count from permission-search;", err)
return
}
metrics.TotalHubsConnected.Set(float64(connected))
}()

wg.Add(1)
go func() {
defer wg.Done()
disconnected, err := perm.Total(token, "hubs", client.ListOptions{
Selection: &client.FeatureSelection{
Feature: "annotations.connected",
Value: "false",
},
})
if err != nil {
log.Println("ERROR: unable to load total disconnected hub count from permission-search;", err)
return
}
metrics.TotalHubsDisconnected.Set(float64(disconnected))
}()

wg.Wait()
metrics.PermissionsRequestDurationForConnectionMetrics.Set(float64(time.Since(start).Milliseconds()))
}
}
20 changes: 20 additions & 0 deletions pkg/prometheus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type Metrics struct {
TotalConnected prometheus.Gauge
TotalDisconnected prometheus.Gauge

TotalHubsConnected prometheus.Gauge
TotalHubsDisconnected prometheus.Gauge

PermissionsRequestDurationForConnectionMetrics prometheus.Gauge

httphandler http.Handler

onMetricsServeRequest func()
Expand Down Expand Up @@ -114,6 +119,18 @@ func NewMetrics(prefix string) *Metrics {
Name: prefix + "_total_disconnected",
Help: "total count of all disconnected devices",
}),
TotalHubsConnected: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefix + "_hubs_total_connected",
Help: "total count of all connected hubs",
}),
TotalHubsDisconnected: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefix + "_hubs_total_disconnected",
Help: "total count of all disconnected hubs",
}),
PermissionsRequestDurationForConnectionMetrics: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefix + "_permissions_request_duration_for_connection_metrics_ms",
Help: "time needed for permissions-search requests that count (dis)connected hubs and devices for metrics",
}),
}

reg.MustRegister(m.TopicsChecked)
Expand All @@ -130,6 +147,9 @@ func NewMetrics(prefix string) *Metrics {
reg.MustRegister(m.SendHubDisconnected)
reg.MustRegister(m.TotalConnected)
reg.MustRegister(m.TotalDisconnected)
reg.MustRegister(m.TotalHubsConnected)
reg.MustRegister(m.TotalHubsDisconnected)
reg.MustRegister(m.PermissionsRequestDurationForConnectionMetrics)

return m
}
Expand Down
59 changes: 0 additions & 59 deletions pkg/tests/docker/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,12 @@ package docker

import (
"context"
"fmt"
"github.com/testcontainers/testcontainers-go"
"io"
"log"
"net"
"os"
)

func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer listener.Close()
return listener.Addr().(*net.TCPAddr).Port, nil
}

func Dockerlog(ctx context.Context, container testcontainers.Container, name string) error {
l, err := container.Logs(ctx)
if err != nil {
Expand All @@ -63,46 +47,3 @@ func (this *LogWriter) Write(p []byte) (n int, err error) {
this.logger.Print(string(p))
return len(p), nil
}

func Forward(ctx context.Context, fromPort int, toAddr string) error {
log.Println("forward", fromPort, "to", toAddr)
incoming, err := net.Listen("tcp", fmt.Sprintf(":%d", fromPort))
if err != nil {
return err
}
go func() {
defer log.Println("closed forward incoming")
<-ctx.Done()
incoming.Close()
}()
go func() {
for {
client, err := incoming.Accept()
if err != nil {
log.Println("FORWARD ERROR:", err)
return
}
go handleForwardClient(client, toAddr)
}
}()
return nil
}

func handleForwardClient(client net.Conn, addr string) {
//log.Println("new forward client")
target, err := net.Dial("tcp", addr)
if err != nil {
log.Println("FORWARD ERROR:", err)
return
}
go func() {
defer target.Close()
defer client.Close()
io.Copy(target, client)
}()
go func() {
defer target.Close()
defer client.Close()
io.Copy(client, target)
}()
}
6 changes: 5 additions & 1 deletion pkg/tests/senergydeviceloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ func createDummyHubs(config configuration.Config) error {
Balancer: &kafka.Hash{},
}
defer writer.Close()
for _, hub := range getDummyHubs() {
hubs := getDummyHubs()
for _, hub := range hubs {
b, err := json.Marshal(map[string]interface{}{
"command": "PUT",
"id": hub.Id,
Expand Down Expand Up @@ -546,6 +547,7 @@ func getDummyHubs() (result []models.Hub) {
result = append(result, models.Hub{
Id: "online-" + strconv.Itoa(i),
Name: "online-" + strconv.Itoa(i),
OwnerId: "testowner",
Hash: "",
DeviceIds: devices,
DeviceLocalIds: localDeviceIds,
Expand All @@ -560,6 +562,7 @@ func getDummyHubs() (result []models.Hub) {
Id: "offline-" + strconv.Itoa(i),
Name: "offline-" + strconv.Itoa(i),
Hash: "",
OwnerId: "testowner",
DeviceIds: devices,
DeviceLocalIds: localDeviceIds,
})
Expand All @@ -572,6 +575,7 @@ func getDummyHubs() (result []models.Hub) {
result = append(result, models.Hub{
Id: "unhandled-" + strconv.Itoa(i),
Name: "unhandled-" + strconv.Itoa(i),
OwnerId: "testowner",
Hash: "",
DeviceIds: devices,
DeviceLocalIds: localDeviceIds,
Expand Down

0 comments on commit 26efe53

Please sign in to comment.