Skip to content

Commit

Permalink
Add raft group for JetStream metrics (#221)
Browse files Browse the repository at this point in the history
* Add raft group for JetStream metrics
  • Loading branch information
mtmk authored Nov 8, 2024
1 parent 56c8cd5 commit 03ec84f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
19 changes: 11 additions & 8 deletions surveyor/collector_statz.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ type accountStats struct {

type streamAccountStats struct {
streamName string
raftGroup string
consumerCount float64
replicaCount float64
}
Expand Down Expand Up @@ -334,8 +335,8 @@ func (sc *StatzCollector) buildDescs() {
sc.descs.accJetstreamTieredMemoryReserved = newPromDesc("account_jetstream_tiered_memory_reserved", "The number of bytes reserved by JetStream memory tier", append(accLabel, "tier"))
sc.descs.accJetstreamTieredStorageReserved = newPromDesc("account_jetstream_tiered_storage_reserved", "The number of bytes reserved by JetStream storage tier", append(accLabel, "tier"))
sc.descs.accJetstreamStreamCount = newPromDesc("account_jetstream_stream_count", "The number of streams in this account", accLabel)
sc.descs.accJetstreamConsumerCount = newPromDesc("account_jetstream_consumer_count", "The number of consumers per stream for this account", append(accLabel, "stream"))
sc.descs.accJetstreamReplicaCount = newPromDesc("account_jetstream_replica_count", "The number of replicas per stream for this account", append(accLabel, "stream"))
sc.descs.accJetstreamConsumerCount = newPromDesc("account_jetstream_consumer_count", "The number of consumers per stream for this account", append(accLabel, "stream", "raft_group"))
sc.descs.accJetstreamReplicaCount = newPromDesc("account_jetstream_replica_count", "The number of replicas per stream for this account", append(accLabel, "stream", "raft_group"))
}

// Surveyor
Expand Down Expand Up @@ -600,6 +601,7 @@ func (sc *StatzCollector) pollAccountInfo() error {
for _, stream := range jsInfo.Streams {
sts.jetstreamStreams = append(sts.jetstreamStreams, streamAccountStats{
streamName: stream.Name,
raftGroup: stream.RaftGroup,
consumerCount: float64(len(stream.Consumer)),
replicaCount: float64(stream.Config.Replicas),
})
Expand Down Expand Up @@ -643,10 +645,11 @@ func (sc *StatzCollector) pollAccountInfo() error {

func (sc *StatzCollector) getJSInfos(nc *nats.Conn) map[string]*server.AccountDetail {
opts := server.JSzOptions{
Accounts: true,
Streams: true,
Consumer: true,
Config: true,
Accounts: true,
Streams: true,
Consumer: true,
Config: true,
RaftGroups: true,
}
res := make([]*server.JSInfo, 0)
req, err := json.Marshal(opts)
Expand Down Expand Up @@ -1087,8 +1090,8 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {

metrics.newGaugeMetric(sc.descs.accJetstreamStreamCount, stat.jetstreamStreamCount, accLabels)
for _, streamStat := range stat.jetstreamStreams {
metrics.newGaugeMetric(sc.descs.accJetstreamConsumerCount, streamStat.consumerCount, append(accLabels, streamStat.streamName))
metrics.newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(accLabels, streamStat.streamName))
metrics.newGaugeMetric(sc.descs.accJetstreamConsumerCount, streamStat.consumerCount, append(accLabels, streamStat.streamName, streamStat.raftGroup))
metrics.newGaugeMetric(sc.descs.accJetstreamReplicaCount, streamStat.replicaCount, append(accLabels, streamStat.streamName, streamStat.raftGroup))
}
}
}
Expand Down
31 changes: 16 additions & 15 deletions surveyor/surveyor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net/http"
"os"
"regexp"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -326,23 +327,23 @@ func TestSurveyor_AccountJetStreamAssets(t *testing.T) {
t.Fatal(err)
}

want := []string{
"nats_core_account_bytes_recv",
"nats_core_account_bytes_sent",
"nats_core_account_conn_count",
"nats_core_account_count",
"nats_core_account_jetstream_enabled",
`nats_core_account_jetstream_stream_count{account="JS"} 10`,
`nats_core_account_jetstream_consumer_count{account="JS",stream="repl1"} 5`,
`nats_core_account_jetstream_consumer_count{account="JS",stream="repl2"} 5`,
`nats_core_account_jetstream_consumer_count{account="JS",stream="single1"} 5`,
`nats_core_account_jetstream_tiered_storage_used{account="JS",tier="R1"}`,
`nats_core_account_jetstream_tiered_storage_used{account="JS",tier="R3"}`,
`nats_core_account_jetstream_tiered_storage_reserved{account="JS",tier="R1"}`,
`nats_core_account_jetstream_tiered_storage_reserved{account="JS",tier="R3"}`,
want := []*regexp.Regexp{
regexp.MustCompile(`nats_core_account_bytes_recv`),
regexp.MustCompile(`nats_core_account_bytes_sent`),
regexp.MustCompile(`nats_core_account_conn_count`),
regexp.MustCompile(`nats_core_account_count`),
regexp.MustCompile(`nats_core_account_jetstream_enabled`),
regexp.MustCompile(`nats_core_account_jetstream_stream_count\{account="JS"} 10`),
regexp.MustCompile(`nats_core_account_jetstream_consumer_count\{account="JS",raft_group="[^"]+",stream="repl1"} 5`),
regexp.MustCompile(`nats_core_account_jetstream_consumer_count\{account="JS",raft_group="[^"]+",stream="repl2"} 5`),
regexp.MustCompile(`nats_core_account_jetstream_consumer_count\{account="JS",raft_group="[^"]+",stream="single1"} 5`),
regexp.MustCompile(`nats_core_account_jetstream_tiered_storage_used\{account="JS",tier="R1"}`),
regexp.MustCompile(`nats_core_account_jetstream_tiered_storage_used\{account="JS",tier="R3"}`),
regexp.MustCompile(`nats_core_account_jetstream_tiered_storage_reserved\{account="JS",tier="R1"}`),
regexp.MustCompile(`nats_core_account_jetstream_tiered_storage_reserved\{account="JS",tier="R3"}`),
}
for _, m := range want {
if !strings.Contains(output, m) {
if !m.MatchString(output) {
t.Logf("output: %s", output)
t.Fatalf("missing: %s", m)
}
Expand Down

0 comments on commit 03ec84f

Please sign in to comment.