Skip to content

Commit

Permalink
Fix metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
periklis committed Jan 30, 2025
1 parent af2fc52 commit a86a132
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions tools/stream-metadata-generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,18 @@ func (s *generator) running(ctx context.Context) error {
ticker := time.NewTicker(time.Second / time.Duration(s.cfg.QPSPerTenant))
defer ticker.Stop()

// Keep track of current stream index
// Keep track of current stream index and whether we've completed first pass
streamIdx := 0
firstPassComplete := false

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Check if we need to reset the stream index
needsReset := streamIdx >= len(streams)
if needsReset {
if streamIdx >= len(streams) {
streamIdx = 0
firstPassComplete = true
}

// Send single stream to Kafka
Expand All @@ -247,9 +247,8 @@ func (s *generator) running(ctx context.Context) error {
return
}

// Only increment the counter if we're not resetting the index
// This avoids double-counting streams when we wrap around
if !needsReset {
// Only increment during the first pass
if !firstPassComplete {
s.metrics.activeStreamsTotal.WithLabelValues(tenantID).Inc()
}

Expand Down

0 comments on commit a86a132

Please sign in to comment.