Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce db stats collector #648

Merged
merged 15 commits into from
Sep 26, 2024
Merged
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ sec: ## Run security checks

.PHONY: fmt
fmt: install-tools ## Formats all go files
$(GO) generate ./...
$(GO) run $(govulncheck) ./...
$(GO) run $(gofumpt) -l -w -extra .
find . -type f -name '*.go' -exec grep -L -E 'Code generated by .*\. DO NOT EDIT.' {} + | xargs $(GO) run $(goimports) -format-only -w -local=github.com/rudderlabs
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/rudderlabs/rudder-go-kit

go 1.22.7
go 1.23.1

replace github.com/gocql/gocql => github.com/scylladb/gocql v1.14.2

Expand Down Expand Up @@ -118,6 +118,7 @@ require (
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-sqlite3 v1.14.23
github.com/minio/md5-simd v1.1.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-shellwords v1.0.12 h1:M2zGm7EW6UQJvDeQxo4T51eKPurbeFbe8WtebGE2xrk=
github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
github.com/mattn/go-sqlite3 v1.14.23 h1:gbShiuAP1W5j9UOksQ06aiiqPMxYecovVGwmTxWtuw0=
github.com/mattn/go-sqlite3 v1.14.23/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/melbahja/goph v1.4.0 h1:z0PgDbBFe66lRYl3v5dGb9aFgPy0kotuQ37QOwSQFqs=
github.com/melbahja/goph v1.4.0/go.mod h1:uG+VfK2Dlhk+O32zFrRlc3kYKTlV6+BtvPWd/kK7U68=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4=
Expand Down
76 changes: 76 additions & 0 deletions stats/collectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package stats

import (
"context"
"fmt"
"sync"
"time"
)

type gaugeTagsFunc = func(key string, tags Tags, val uint64)

type Collector interface {
Collect(gaugeTagsFunc)
Zero(gaugeTagsFunc)
ID() string
}

type aggregatedCollector struct {
c map[string]Collector
PauseDur time.Duration
gaugeFunc gaugeTagsFunc
mu sync.Mutex
}

func (p *aggregatedCollector) Add(c Collector) error {
p.mu.Lock()
defer p.mu.Unlock()

if p.c == nil {
p.c = make(map[string]Collector)
}

if _, ok := p.c[c.ID()]; ok {
return fmt.Errorf("collector with ID %s already register", c.ID())
}

p.c[c.ID()] = c
return nil
}

func (p *aggregatedCollector) Run(ctx context.Context) {
defer p.allZero()
p.allCollect()

if p.PauseDur <= 0 {
p.PauseDur = 10 * time.Second
return
lvrach marked this conversation as resolved.
Show resolved Hide resolved
}

tick := time.NewTicker(p.PauseDur)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
p.allCollect()
}
}
}

func (p *aggregatedCollector) allCollect() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.c {
c.Collect(p.gaugeFunc)
}
}

func (p *aggregatedCollector) allZero() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.c {
c.Zero(p.gaugeFunc)
}
}
62 changes: 62 additions & 0 deletions stats/collectors/sqldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package collectors

import (
"database/sql"
"fmt"

"github.com/rudderlabs/rudder-go-kit/stats"
)

const (
uniqName = "database_sql_%s"
)

type sqlDBStats struct {
name string
db *sql.DB
}

func NewDatabaseSQLStats(name string, db *sql.DB) *sqlDBStats {
lvrach marked this conversation as resolved.
Show resolved Hide resolved
return &sqlDBStats{
name: name,
db: db,
}
}

func (s *sqlDBStats) Collect(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
dbStats := s.db.Stats()
tags := stats.Tags{"name": s.name}

gaugeFunc("sql_db_max_open_connections", tags, uint64(dbStats.MaxOpenConnections))
gaugeFunc("sql_db_open_connections", tags, uint64(dbStats.OpenConnections))
gaugeFunc("sql_db_in_use_connections", tags, uint64(dbStats.InUse))
gaugeFunc("sql_db_idle_connections", tags, uint64(dbStats.Idle))

gaugeFunc("sql_db_wait_count_total", tags, uint64(dbStats.WaitCount))
gaugeFunc("sql_db_wait_duration_seconds_total", tags, uint64(dbStats.WaitDuration.Seconds()))

gaugeFunc("sql_db_max_idle_closed_total", tags, uint64(dbStats.MaxIdleClosed))
gaugeFunc("sql_db_max_idle_time_closed_total", tags, uint64(dbStats.MaxIdleTimeClosed))
gaugeFunc("sql_db_max_lifetime_closed_total", tags, uint64(dbStats.MaxLifetimeClosed))
}

func (s *sqlDBStats) Zero(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
tags := stats.Tags{"name": s.name}

gaugeFunc("sql_db_max_open_connections", tags, 0)

gaugeFunc("sql_db_open_connections", tags, 0)
gaugeFunc("sql_db_in_use_connections", tags, 0)
gaugeFunc("sql_db_idle_connections", tags, 0)

gaugeFunc("sql_db_wait_count_total", tags, 0)
gaugeFunc("sql_db_wait_duration_seconds_total", tags, 0)

gaugeFunc("sql_db_max_idle_closed_total", tags, 0)
gaugeFunc("sql_db_max_idle_time_closed_total", tags, 0)
gaugeFunc("sql_db_max_lifetime_closed_total", tags, 0)
}

func (s *sqlDBStats) ID() string {
return fmt.Sprintf(uniqName, s.name)
}
76 changes: 76 additions & 0 deletions stats/collectors/sqldb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package collectors_test

import (
"database/sql"
"testing"

_ "github.com/mattn/go-sqlite3" // Import the SQLite driver

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
)

func TestSQLDatabase(t *testing.T) {
db, err := sql.Open("sqlite3", ":memory:")
require.NoError(t, err)

m, err := memstats.New()
require.NoError(t, err)

testName := "test_sqlite"
s := collectors.NewDatabaseSQLStats(testName, db)

err = m.RegisterCollector(s)
require.NoError(t, err)

require.Equal(t, []memstats.Metric{
{
Name: "sql_db_idle_connections",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_in_use_connections",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_idle_closed_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_idle_time_closed_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_lifetime_closed_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_open_connections",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_open_connections",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_wait_count_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_wait_duration_seconds_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
}, m.GetAll())
}
39 changes: 39 additions & 0 deletions stats/collectors/static.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package collectors

import (
"fmt"

"github.com/rudderlabs/rudder-go-kit/stats"
)

const (
statsUniqName = "static_%s_%s"
)

type staticStats struct {
tags stats.Tags
key string
value uint64
}

// NewStaticMetric allows to capture a gauge metric that does not change during the lifetime of the application.
// Can be useful for capturing configuration values or application version.
func NewStaticMetric(key string, tags stats.Tags, value uint64) *staticStats {
return &staticStats{
tags: tags,
key: key,
value: value,
}
}

func (s *staticStats) Collect(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
gaugeFunc(s.key, s.tags, s.value)
}

func (s *staticStats) Zero(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
gaugeFunc(s.key, s.tags, 0)
}

func (s *staticStats) ID() string {
return fmt.Sprintf(statsUniqName, s.key, s.tags.String())
}
32 changes: 32 additions & 0 deletions stats/collectors/static_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package collectors_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
)

func TestStatic(t *testing.T) {
testName := "test_sqlite"
s := collectors.NewStaticMetric(testName, stats.Tags{
"foo": "bar",
}, 2)

m, err := memstats.New()
require.NoError(t, err)

err = m.RegisterCollector(s)
require.NoError(t, err)

require.Equal(t, []memstats.Metric{
{
Name: testName,
Tags: stats.Tags{"foo": "bar"},
Value: 2,
},
}, m.GetAll())
}
7 changes: 7 additions & 0 deletions stats/memstats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,13 @@ func (*Store) Start(_ context.Context, _ stats.GoRoutineFactory) error { return
// Stop implements stats.Stats
func (*Store) Stop() {}

func (s *Store) RegisterCollector(c stats.Collector) error {
c.Collect(func(key string, tags stats.Tags, val uint64) {
s.NewTaggedStat(key, stats.GaugeType, tags).Gauge(val)
})
return nil
}

// getKey maps name and tags, to a store lookup key.
func (*Store) getKey(name string, tags stats.Tags) string {
return name + tags.String()
Expand Down
14 changes: 14 additions & 0 deletions stats/mock_stats/mock_stats.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions stats/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ func (*nop) NewTracer(_ string) Tracer {

func (*nop) Start(_ context.Context, _ GoRoutineFactory) error { return nil }
func (*nop) Stop() {}

func (*nop) RegisterCollector(c Collector) error { return nil }
13 changes: 13 additions & 0 deletions stats/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type otelStats struct {
histogramsMu sync.Mutex

otelManager otel.Manager
collectorAggregator *aggregatedCollector
runtimeStatsCollector runtimeStatsCollector
metricsStatsCollector metricStatsCollector
stopBackgroundCollection func()
Expand Down Expand Up @@ -181,6 +182,14 @@ func (s *otelStats) Start(ctx context.Context, goFactory GoRoutineFactory) error
s.metricsStatsCollector.run(backgroundCollectionCtx)
})

gaugeTagsFunc := func(key string, tags Tags, val uint64) {
s.getMeasurement(key, GaugeType, tags).Gauge(val)
}
s.collectorAggregator.gaugeFunc = gaugeTagsFunc
goFactory.Go(func() {
s.collectorAggregator.Run(backgroundCollectionCtx)
})

if s.config.periodicStatsConfig.enabled {
s.runtimeStatsCollector = newRuntimeStatsCollector(gaugeFunc)
s.runtimeStatsCollector.PauseDur = time.Duration(s.config.periodicStatsConfig.statsCollectionInterval) * time.Second
Expand All @@ -203,6 +212,10 @@ func (s *otelStats) Start(ctx context.Context, goFactory GoRoutineFactory) error
return nil
}

func (s *otelStats) RegisterCollector(c Collector) error {
return s.collectorAggregator.Add(c)
}

func (s *otelStats) Stop() {
if !s.config.enabled.Load() {
return
Expand Down
Loading
Loading