Skip to content

Commit

Permalink
Add per-account tiered storage calculation (#156)
Browse files Browse the repository at this point in the history
* add tiered storage calculation

Signed-off-by: Caleb Lloyd <[email protected]>

* capital R in tier

Signed-off-by: Caleb Lloyd <[email protected]>

* Switched calculation based on replica count

* add tiered reserved

Signed-off-by: Caleb Lloyd <[email protected]>

---------

Signed-off-by: Caleb Lloyd <[email protected]>
Co-authored-by: Jeremy Saenz <[email protected]>
  • Loading branch information
caleblloyd and codegangsta authored Nov 27, 2023
1 parent 4db7adc commit 6beb11e
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 23 deletions.
98 changes: 75 additions & 23 deletions surveyor/collector_statz.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,26 @@ type statzDescs struct {
JetstreamClusterRaftGroupReplicaOffline *prometheus.Desc

// Account scope metrics
accCount *prometheus.Desc
accConnCount *prometheus.Desc
accLeafCount *prometheus.Desc
accSubCount *prometheus.Desc
accBytesSent *prometheus.Desc
accBytesRecv *prometheus.Desc
accMsgsSent *prometheus.Desc
accMsgsRecv *prometheus.Desc
accJetstreamEnabled *prometheus.Desc
accJetstreamMemoryUsed *prometheus.Desc
accJetstreamStorageUsed *prometheus.Desc
accJetstreamMemoryReserved *prometheus.Desc
accJetstreamStorageReserved *prometheus.Desc
accJetstreamStreamCount *prometheus.Desc
accJetstreamConsumerCount *prometheus.Desc
accJetstreamReplicaCount *prometheus.Desc
accCount *prometheus.Desc
accConnCount *prometheus.Desc
accLeafCount *prometheus.Desc
accSubCount *prometheus.Desc
accBytesSent *prometheus.Desc
accBytesRecv *prometheus.Desc
accMsgsSent *prometheus.Desc
accMsgsRecv *prometheus.Desc
accJetstreamEnabled *prometheus.Desc
accJetstreamMemoryUsed *prometheus.Desc
accJetstreamStorageUsed *prometheus.Desc
accJetstreamMemoryReserved *prometheus.Desc
accJetstreamStorageReserved *prometheus.Desc
accJetstreamTieredMemoryUsed *prometheus.Desc
accJetstreamTieredStorageUsed *prometheus.Desc
accJetstreamTieredMemoryReserved *prometheus.Desc
accJetstreamTieredStorageReserved *prometheus.Desc
accJetstreamStreamCount *prometheus.Desc
accJetstreamConsumerCount *prometheus.Desc
accJetstreamReplicaCount *prometheus.Desc
}

// StatzCollector collects statz from a server deployment
Expand Down Expand Up @@ -159,13 +163,17 @@ type accountStats struct {
msgsSent float64
msgsRecv float64

jetstreamEnabled float64
jetstreamMemoryUsed float64
jetstreamStorageUsed float64
jetstreamMemoryReserved float64
jetstreamStorageReserved float64
jetstreamStreamCount float64
jetstreamStreams []streamAccountStats
jetstreamEnabled float64
jetstreamMemoryUsed float64
jetstreamStorageUsed float64
jetstreamMemoryReserved float64
jetstreamStorageReserved float64
jetstreamTieredMemoryUsed map[int]float64
jetstreamTieredStorageUsed map[int]float64
jetstreamTieredMemoryReserved map[int]float64
jetstreamTieredStorageReserved map[int]float64
jetstreamStreamCount float64
jetstreamStreams []streamAccountStats
}

type streamAccountStats struct {
Expand Down Expand Up @@ -311,6 +319,10 @@ func (sc *StatzCollector) buildDescs() {
sc.descs.accJetstreamStorageUsed = newPromDesc("account_jetstream_storage_used", "The number of bytes used by JetStream storage", accLabel)
sc.descs.accJetstreamMemoryReserved = newPromDesc("account_jetstream_memory_reserved", "The number of bytes reserved by JetStream memory", accLabel)
sc.descs.accJetstreamStorageReserved = newPromDesc("account_jetstream_storage_reserved", "The number of bytes reserved by JetStream storage", accLabel)
sc.descs.accJetstreamTieredMemoryUsed = newPromDesc("account_jetstream_tiered_memory_used", "The number of bytes used by JetStream memory tier", append(accLabel, "tier"))
sc.descs.accJetstreamTieredStorageUsed = newPromDesc("account_jetstream_tiered_storage_used", "The number of bytes used by JetStream storage tier", append(accLabel, "tier"))
sc.descs.accJetstreamTieredMemoryReserved = newPromDesc("account_jetstream_tiered_memory_reserved", "The number of bytes reserved by JetStream memory tier", append(accLabel, "tier"))
sc.descs.accJetstreamTieredStorageReserved = newPromDesc("account_jetstream_tiered_storage_reserved", "The number of bytes reserved by JetStream storage tier", append(accLabel, "tier"))
sc.descs.accJetstreamStreamCount = newPromDesc("account_jetstream_stream_count", "The number of streams in this account", accLabel)
sc.descs.accJetstreamConsumerCount = newPromDesc("account_jetstream_consumer_count", "The number of consumers per stream for this account", append(accLabel, "stream"))
sc.descs.accJetstreamReplicaCount = newPromDesc("account_jetstream_replica_count", "The number of replicas per stream for this account", append(accLabel, "stream"))
Expand Down Expand Up @@ -563,6 +575,10 @@ func (sc *StatzCollector) pollAccountInfo() error {
sts.jetstreamStorageUsed = float64(jsInfo.Store)
sts.jetstreamMemoryReserved = float64(jsInfo.ReservedMemory)
sts.jetstreamStorageReserved = float64(jsInfo.ReservedStore)
sts.jetstreamTieredMemoryUsed = make(map[int]float64)
sts.jetstreamTieredStorageUsed = make(map[int]float64)
sts.jetstreamTieredMemoryReserved = make(map[int]float64)
sts.jetstreamTieredStorageReserved = make(map[int]float64)

sts.jetstreamStreamCount = float64(len(jsInfo.Streams))
for _, stream := range jsInfo.Streams {
Expand All @@ -571,6 +587,30 @@ func (sc *StatzCollector) pollAccountInfo() error {
consumerCount: float64(len(stream.Consumer)),
replicaCount: float64(stream.Config.Replicas),
})

// computed tiered storage usage
used := float64(stream.State.Bytes * uint64(stream.Config.Replicas))
var reserved float64
if stream.Config.MaxBytes > 0 {
reserved = float64(stream.Config.MaxBytes * int64(stream.Config.Replicas))
}
if stream.Config.Storage == server.MemoryStorage {
if _, ok = sts.jetstreamTieredMemoryUsed[stream.Config.Replicas]; ok {
sts.jetstreamTieredMemoryUsed[stream.Config.Replicas] += used
sts.jetstreamTieredMemoryReserved[stream.Config.Replicas] += reserved
} else {
sts.jetstreamTieredMemoryUsed[stream.Config.Replicas] = used
sts.jetstreamTieredMemoryReserved[stream.Config.Replicas] = reserved
}
} else if stream.Config.Storage == server.FileStorage {
if _, ok = sts.jetstreamTieredStorageUsed[stream.Config.Replicas]; ok {
sts.jetstreamTieredStorageUsed[stream.Config.Replicas] += used
sts.jetstreamTieredStorageReserved[stream.Config.Replicas] += reserved
} else {
sts.jetstreamTieredStorageUsed[stream.Config.Replicas] = used
sts.jetstreamTieredStorageReserved[stream.Config.Replicas] = reserved
}
}
}
accStats[jsInfo.Id] = sts
}
Expand Down Expand Up @@ -1000,6 +1040,18 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {
ch <- newGaugeMetric(sc.descs.accJetstreamStorageUsed, stat.jetstreamStorageUsed, id)
ch <- newGaugeMetric(sc.descs.accJetstreamMemoryReserved, stat.jetstreamMemoryReserved, id)
ch <- newGaugeMetric(sc.descs.accJetstreamStorageReserved, stat.jetstreamStorageReserved, id)
for tier, size := range stat.jetstreamTieredMemoryUsed {
ch <- newGaugeMetric(sc.descs.accJetstreamTieredMemoryUsed, size, append(id, fmt.Sprintf("R%d", tier)))
}
for tier, size := range stat.jetstreamTieredStorageUsed {
ch <- newGaugeMetric(sc.descs.accJetstreamTieredStorageUsed, size, append(id, fmt.Sprintf("R%d", tier)))
}
for tier, size := range stat.jetstreamTieredMemoryReserved {
ch <- newGaugeMetric(sc.descs.accJetstreamTieredMemoryReserved, size, append(id, fmt.Sprintf("R%d", tier)))
}
for tier, size := range stat.jetstreamTieredStorageReserved {
ch <- newGaugeMetric(sc.descs.accJetstreamTieredStorageReserved, size, append(id, fmt.Sprintf("R%d", tier)))
}

ch <- newGaugeMetric(sc.descs.accJetstreamStreamCount, stat.jetstreamStreamCount, id)
for _, streamStat := range stat.jetstreamStreams {
Expand Down
4 changes: 4 additions & 0 deletions surveyor/surveyor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ func TestSurveyor_AccountJetStreamAssets(t *testing.T) {
`nats_core_account_jetstream_consumer_count{account="JS",stream="repl1"} 5`,
`nats_core_account_jetstream_consumer_count{account="JS",stream="repl2"} 5`,
`nats_core_account_jetstream_consumer_count{account="JS",stream="single1"} 5`,
`nats_core_account_jetstream_tiered_storage_used{account="JS",tier="R1"}`,
`nats_core_account_jetstream_tiered_storage_used{account="JS",tier="R3"}`,
`nats_core_account_jetstream_tiered_storage_reserved{account="JS",tier="R1"}`,
`nats_core_account_jetstream_tiered_storage_reserved{account="JS",tier="R3"}`,
}
for _, m := range want {
if !strings.Contains(output, m) {
Expand Down

0 comments on commit 6beb11e

Please sign in to comment.