Skip to content

Commit

Permalink
feat: collect metrics from engine in RESIZING and DRAINING statuses (#15
Browse files Browse the repository at this point in the history
)

Previously, the metrics were collected from engines in `RUNNING` status.

Now it is extended to collect metrics from engines in `RESIZING` and
`DRAINING` statuses. Also, engine status attribute added to engine
runtime and query history metrics.
  • Loading branch information
alexkaplun-firebolt authored Dec 13, 2024
1 parent acd804a commit 7369411
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 131 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ docker run --name firebolt-otel-exporter \
-e FIREBOLT_OTEL_EXPORTER_GRPC_ADDRESS=127.0.0.1:4317 \
-e FIREBOLT_OTEL_EXPORTER_LOG_LEVEL=debug \
--network="host" \
ghcr.io/firebolt-db/otel-exporter:v0.0.5
ghcr.io/firebolt-db/otel-exporter:v0.0.6
```

Meters and instruments
Expand All @@ -59,6 +59,7 @@ The exporter's structure of meters and instruments is described below. See [OTLP
All the instruments in this meter have the following attributes:
- `firebolt.account.name` - name of the account
- `firebolt.engine.name` - name of the engine
- `firebolt.engine.status` - status of the engine (possible statuses are `RUNNING`, `RESIZING`, `DRAINING`)

### Meter name: `firebolt.engine.query_history`

Expand All @@ -79,6 +80,7 @@ All the instruments in this meter have the following attributes:
- `firebolt.engine.name` - name of the engine
- `firebolt.user.name` - name of the user executing query
- `firebolt.query.status` - status of the query
- `firebolt.engine.status` - status of the engine (possible statuses are `RUNNING`, `RESIZING`, `DRAINING`)

### Meter name: `firebolt.exporter`

Expand Down
56 changes: 29 additions & 27 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,44 +1,46 @@
module github.com/firebolt-db/otel-exporter

go 1.22.0
go 1.23.1

require (
github.com/firebolt-db/firebolt-go-sdk v1.1.0
github.com/firebolt-db/firebolt-go-sdk v1.4.0
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
github.com/sethvargo/go-envconfig v1.0.3
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.2
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.27.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
golang.org/x/oauth2 v0.20.0
google.golang.org/grpc v1.64.1
github.com/sethvargo/go-envconfig v1.1.0
github.com/stretchr/testify v1.10.0
github.com/urfave/cli/v2 v2.27.5
go.opentelemetry.io/otel v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.33.0
go.opentelemetry.io/otel/metric v1.33.0
go.opentelemetry.io/otel/sdk v1.33.0
go.opentelemetry.io/otel/sdk/metric v1.33.0
golang.org/x/oauth2 v0.24.0
google.golang.org/grpc v1.69.0
)

require (
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/compute/metadata v0.5.2 // indirect
github.com/astaxie/beego v1.12.3 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/matishsiao/goInfo v0.0.0-20210923090445-da2e3fa8d45f // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
github.com/matishsiao/goInfo v0.0.0-20240924010139-10388a85396f // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect
google.golang.org/protobuf v1.34.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/trace v1.33.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
112 changes: 58 additions & 54 deletions go.sum

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions internal/collector/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (

"go.opentelemetry.io/otel/attribute"
api "go.opentelemetry.io/otel/metric"

"github.com/firebolt-db/otel-exporter/internal/fetcher"
)

// collectorFn is a function which is responsible for collecting metrics in a single account, with list of engines.
// It is expected that collectorFn will only collect metrics in time interval between `since` and `till`.
type collectorFn func(ctx context.Context, wg *sync.WaitGroup, accountName string, engines []string, since, till time.Time)
type collectorFn func(ctx context.Context, wg *sync.WaitGroup, accountName string, engines []fetcher.Engine, since, till time.Time)

// Start runs main metrics collection routine with specified interval.
// Start will block until provided context is done, or the app is closed.
Expand Down Expand Up @@ -80,7 +82,7 @@ func (c *collector) reportExporterDuration(ctx context.Context, startTime time.T
}

// collectRuntimeMetrics collects and reports engine runtime metrics, such as cpu utilization, memory utilization etc.
func (c *collector) collectRuntimeMetrics(ctx context.Context, wg *sync.WaitGroup, accountName string, engines []string, since, till time.Time) {
func (c *collector) collectRuntimeMetrics(ctx context.Context, wg *sync.WaitGroup, accountName string, engines []fetcher.Engine, since, till time.Time) {
slog.DebugContext(ctx, "start collecting runtime metrics", slog.String("accountName", accountName))

pointsCh := c.fetcher.FetchRuntimePoints(ctx, accountName, engines, since, till)
Expand All @@ -89,6 +91,7 @@ func (c *collector) collectRuntimeMetrics(ctx context.Context, wg *sync.WaitGrou
attrs := []attribute.KeyValue{
attribute.Key("firebolt.account.name").String(accountName),
attribute.Key("firebolt.engine.name").String(mp.EngineName),
attribute.Key("firebolt.engine.status").String(mp.EngineStatus),
}

attrsSet := attribute.NewSet(attrs...)
Expand All @@ -108,7 +111,7 @@ func (c *collector) collectRuntimeMetrics(ctx context.Context, wg *sync.WaitGrou
}

// collectQueryHistoryMetrics collects and reports query history metrics, such as rows and bytes scanned, etc.
func (c *collector) collectQueryHistoryMetrics(ctx context.Context, wg *sync.WaitGroup, accountName string, engines []string, since, till time.Time) {
func (c *collector) collectQueryHistoryMetrics(ctx context.Context, wg *sync.WaitGroup, accountName string, engines []fetcher.Engine, since, till time.Time) {
slog.DebugContext(ctx, "start collecting query history metrics", slog.String("accountName", accountName))

pointsCh := c.fetcher.FetchQueryHistoryPoints(ctx, accountName, engines, since, till)
Expand All @@ -117,6 +120,7 @@ func (c *collector) collectQueryHistoryMetrics(ctx context.Context, wg *sync.Wai
attrs := []attribute.KeyValue{
attribute.Key("firebolt.account.name").String(accountName),
attribute.Key("firebolt.engine.name").String(mp.EngineName),
attribute.Key("firebolt.engine.status").String(mp.EngineStatus),
attribute.Key("firebolt.user.name").String(mp.UserName.String),
attribute.Key("firebolt.query.status").String(mp.Status.String),
}
Expand Down
21 changes: 13 additions & 8 deletions internal/collector/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"

"github.com/firebolt-db/otel-exporter/internal/fetcher"
)
Expand All @@ -29,19 +29,22 @@ func Test_Collector_Start(t *testing.T) {

interval := 25 * time.Millisecond

eng := []string{"engine1", "engine2"}
f.fetchEnginesFn = func(ctx context.Context, accountName string) ([]string, error) {
eng := []fetcher.Engine{
{Name: "engine1", Status: "RUNNING"},
{Name: "engine2", Status: "RESIZING"},
}
f.fetchEnginesFn = func(ctx context.Context, accountName string) ([]fetcher.Engine, error) {
require.Equal(t, acctName, accountName)
return eng, nil
}
rCh := make(chan fetcher.EngineRuntimePoint)
f.fetchRuntimePointsFn = func(ctx context.Context, account string, engines []string, since, till time.Time) <-chan fetcher.EngineRuntimePoint {
f.fetchRuntimePointsFn = func(ctx context.Context, account string, engines []fetcher.Engine, since, till time.Time) <-chan fetcher.EngineRuntimePoint {
require.Equal(t, acctName, account)
require.Equal(t, eng, engines)
return rCh
}
qhCh := make(chan fetcher.QueryHistoryPoint)
f.fetchQueryHistoryPointsFn = func(ctx context.Context, account string, engines []string, since, till time.Time) <-chan fetcher.QueryHistoryPoint {
f.fetchQueryHistoryPointsFn = func(ctx context.Context, account string, engines []fetcher.Engine, since, till time.Time) <-chan fetcher.QueryHistoryPoint {
require.Equal(t, acctName, account)
require.Equal(t, eng, engines)
return qhCh
Expand All @@ -64,13 +67,15 @@ func Test_Collector_Start(t *testing.T) {
sentCh := make(chan struct{})
go func() {
rCh <- fetcher.EngineRuntimePoint{
EngineName: "eng1",
EventTime: sql.Null[time.Time]{Valid: true, V: time.Now()},
CPUUsed: sql.NullFloat64{Valid: true, Float64: 10},
EngineName: "eng1",
EngineStatus: "RUNNING",
EventTime: sql.Null[time.Time]{Valid: true, V: time.Now()},
CPUUsed: sql.NullFloat64{Valid: true, Float64: 10},
}

qhCh <- fetcher.QueryHistoryPoint{
EngineName: "eng2",
EngineStatus: "RESIZING",
DurationMicroSeconds: sql.NullInt64{Valid: true, Int64: 10},
}
sentCh <- struct{}{}
Expand Down
18 changes: 9 additions & 9 deletions internal/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,32 @@ func Test_NewCollector_missing_exporter(t *testing.T) {
}

type fetcherMock struct {
fetchEnginesFn func(ctx context.Context, accountName string) ([]string, error)
fetchRuntimePointsFn func(ctx context.Context, account string, engines []string, since, till time.Time) <-chan fetcher.EngineRuntimePoint
fetchQueryHistoryPointsFn func(ctx context.Context, account string, engines []string, since, till time.Time) <-chan fetcher.QueryHistoryPoint
fetchEnginesFn func(ctx context.Context, accountName string) ([]fetcher.Engine, error)
fetchRuntimePointsFn func(ctx context.Context, account string, engines []fetcher.Engine, since, till time.Time) <-chan fetcher.EngineRuntimePoint
fetchQueryHistoryPointsFn func(ctx context.Context, account string, engines []fetcher.Engine, since, till time.Time) <-chan fetcher.QueryHistoryPoint
}

func newFetcherMock() *fetcherMock {
return &fetcherMock{
fetchEnginesFn: func(ctx context.Context, accountName string) ([]string, error) {
fetchEnginesFn: func(ctx context.Context, accountName string) ([]fetcher.Engine, error) {
panic("default FetchEngines")
},
fetchRuntimePointsFn: func(ctx context.Context, account string, engines []string, since, till time.Time) <-chan fetcher.EngineRuntimePoint {
fetchRuntimePointsFn: func(ctx context.Context, account string, engines []fetcher.Engine, since, till time.Time) <-chan fetcher.EngineRuntimePoint {
panic("default FetchRuntimePoints")
},
fetchQueryHistoryPointsFn: func(ctx context.Context, account string, engines []string, since, till time.Time) <-chan fetcher.QueryHistoryPoint {
fetchQueryHistoryPointsFn: func(ctx context.Context, account string, engines []fetcher.Engine, since, till time.Time) <-chan fetcher.QueryHistoryPoint {
panic("default FetchQueryHistoryPoints")
},
}
}

func (m *fetcherMock) FetchEngines(ctx context.Context, accountName string) ([]string, error) {
func (m *fetcherMock) FetchEngines(ctx context.Context, accountName string) ([]fetcher.Engine, error) {
return m.fetchEnginesFn(ctx, accountName)
}
func (m *fetcherMock) FetchRuntimePoints(ctx context.Context, account string, engines []string, since, till time.Time) <-chan fetcher.EngineRuntimePoint {
func (m *fetcherMock) FetchRuntimePoints(ctx context.Context, account string, engines []fetcher.Engine, since, till time.Time) <-chan fetcher.EngineRuntimePoint {
return m.fetchRuntimePointsFn(ctx, account, engines, since, till)
}
func (m *fetcherMock) FetchQueryHistoryPoints(ctx context.Context, account string, engines []string, since, till time.Time) <-chan fetcher.QueryHistoryPoint {
func (m *fetcherMock) FetchQueryHistoryPoints(ctx context.Context, account string, engines []fetcher.Engine, since, till time.Time) <-chan fetcher.QueryHistoryPoint {
return m.fetchQueryHistoryPointsFn(ctx, account, engines, since, till)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/collector/meter_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

const (
Expand Down
Loading

0 comments on commit 7369411

Please sign in to comment.