Skip to content

Commit

Permalink
chore: handle all columns as nullable in queries to engine_metrics_hi…
Browse files Browse the repository at this point in the history
…story and engine_query_history (#11)

All columns in views `engine_metrics_history` and `engine_query_history`
are nullable, so handling of the queries was updated
  • Loading branch information
alexkaplun-firebolt authored Jun 27, 2024
1 parent c647c25 commit b243106
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 52 deletions.
36 changes: 18 additions & 18 deletions internal/collector/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ func (c *collector) collectRuntimeMetrics(ctx context.Context, wg *sync.WaitGrou

attrsSet := attribute.NewSet(attrs...)

c.runtimeMetrics.cpuUtilization.Record(ctx, mp.CPUUsed, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.memoryUtilization.Record(ctx, mp.MemoryUsed, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.diskUtilization.Record(ctx, mp.DiskUsed, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.cacheUtilization.Record(ctx, mp.CacheHitRatio, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.diskSpilled.Add(ctx, mp.SpilledBytes, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.runningQueries.Record(ctx, mp.RunningQueries, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.suspendedQueries.Record(ctx, mp.SuspendedQueries, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.cpuUtilization.Record(ctx, mp.CPUUsed.Float64, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.memoryUtilization.Record(ctx, mp.MemoryUsed.Float64, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.diskUtilization.Record(ctx, mp.DiskUsed.Float64, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.cacheUtilization.Record(ctx, mp.CacheHitRatio.Float64, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.diskSpilled.Add(ctx, mp.SpilledBytes.Int64, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.runningQueries.Record(ctx, mp.RunningQueries.Int64, api.WithAttributeSet(attrsSet))
c.runtimeMetrics.suspendedQueries.Record(ctx, mp.SuspendedQueries.Int64, api.WithAttributeSet(attrsSet))
}

wg.Done()
Expand All @@ -117,21 +117,21 @@ func (c *collector) collectQueryHistoryMetrics(ctx context.Context, wg *sync.Wai
attrs := []attribute.KeyValue{
attribute.Key("firebolt.account.name").String(accountName),
attribute.Key("firebolt.engine.name").String(mp.EngineName),
attribute.Key("firebolt.user.name").String(mp.UserName),
attribute.Key("firebolt.query.status").String(mp.Status),
attribute.Key("firebolt.user.name").String(mp.UserName.String),
attribute.Key("firebolt.query.status").String(mp.Status.String),
}

attrsSet := attribute.NewSet(attrs...)

c.queryHistoryMetrics.queryDuration.Record(ctx, float64(mp.DurationMicroSeconds)/1000000, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.scannedRows.Add(ctx, mp.ScannedRows, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.scannedBytes.Add(ctx, mp.ScannedBytes, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.insertedRows.Add(ctx, mp.InsertedRows, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.insertedBytes.Add(ctx, mp.InsertedBytes, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.returnedRows.Add(ctx, mp.ReturnedRows, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.returnedBytes.Add(ctx, mp.ReturnedBytes, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.spilledBytes.Add(ctx, mp.SpilledBytes, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.queueTime.Add(ctx, float64(mp.TimeInQueueMicroSeconds)/1000000, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.queryDuration.Record(ctx, float64(mp.DurationMicroSeconds.Int64)/1000000, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.scannedRows.Add(ctx, mp.ScannedRows.Int64, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.scannedBytes.Add(ctx, mp.ScannedBytes.Int64, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.insertedRows.Add(ctx, mp.InsertedRows.Int64, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.insertedBytes.Add(ctx, mp.InsertedBytes.Int64, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.returnedRows.Add(ctx, mp.ReturnedRows.Int64, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.returnedBytes.Add(ctx, mp.ReturnedBytes.Int64, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.spilledBytes.Add(ctx, mp.SpilledBytes.Int64, api.WithAttributeSet(attrsSet))
c.queryHistoryMetrics.queueTime.Add(ctx, float64(mp.TimeInQueueMicroSeconds.Int64)/1000000, api.WithAttributeSet(attrsSet))
}

wg.Done()
Expand Down
7 changes: 4 additions & 3 deletions internal/collector/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collector

import (
"context"
"database/sql"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -64,13 +65,13 @@ func Test_Collector_Start(t *testing.T) {
go func() {
rCh <- fetcher.EngineRuntimePoint{
EngineName: "eng1",
EventTime: time.Now(),
CPUUsed: 10,
EventTime: sql.Null[time.Time]{Valid: true, V: time.Now()},
CPUUsed: sql.NullFloat64{Valid: true, Float64: 10},
}

qhCh <- fetcher.QueryHistoryPoint{
EngineName: "eng2",
DurationMicroSeconds: 10,
DurationMicroSeconds: sql.NullInt64{Valid: true, Int64: 10},
}
sentCh <- struct{}{}
}()
Expand Down
11 changes: 1 addition & 10 deletions internal/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ func (f *fetcher) FetchQueryHistoryPoints(ctx context.Context, account string, e
for rows.Next() {
qhp := QueryHistoryPoint{EngineName: engineName}

userName, accountName := sql.NullString{}, sql.NullString{}

if err := rows.Scan(&accountName, &userName, &qhp.DurationMicroSeconds, &qhp.Status,
if err := rows.Scan(&qhp.AccountName, &qhp.UserName, &qhp.DurationMicroSeconds, &qhp.Status,
&qhp.ScannedRows, &qhp.ScannedBytes, &qhp.InsertedRows, &qhp.InsertedBytes, &qhp.SpilledBytes,
&qhp.ReturnedRows, &qhp.ReturnedBytes, &qhp.TimeInQueueMicroSeconds,
); err != nil {
Expand All @@ -198,13 +196,6 @@ func (f *fetcher) FetchQueryHistoryPoints(ctx context.Context, account string, e
return
}

if userName.Valid {
qhp.UserName = userName.String
}
if accountName.Valid {
qhp.AccountName = accountName.String
}

ch <- qhp
}
}(engine)
Expand Down
42 changes: 21 additions & 21 deletions internal/fetcher/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
type EngineRuntimePoint struct {
EngineName string

EngineCluster string
EventTime time.Time
CPUUsed float64
MemoryUsed float64
DiskUsed float64
CacheHitRatio float64
SpilledBytes int64
RunningQueries int64
SuspendedQueries int64
EngineCluster sql.NullString
EventTime sql.Null[time.Time]
CPUUsed sql.NullFloat64
MemoryUsed sql.NullFloat64
DiskUsed sql.NullFloat64
CacheHitRatio sql.NullFloat64
SpilledBytes sql.NullInt64
RunningQueries sql.NullInt64
SuspendedQueries sql.NullInt64
}

// Scan fills in EngineRuntimePoint fields from a single row.
Expand All @@ -39,18 +39,18 @@ func (p *EngineRuntimePoint) Scan(row *sql.Row) error {
type QueryHistoryPoint struct {
EngineName string

AccountName string
UserName string
AccountName sql.NullString
UserName sql.NullString

DurationMicroSeconds int64
Status string
DurationMicroSeconds sql.NullInt64
Status sql.NullString

ScannedRows int64
ScannedBytes int64
InsertedRows int64
InsertedBytes int64
SpilledBytes int64
ReturnedRows int64
ReturnedBytes int64
TimeInQueueMicroSeconds int64
ScannedRows sql.NullInt64
ScannedBytes sql.NullInt64
InsertedRows sql.NullInt64
InsertedBytes sql.NullInt64
SpilledBytes sql.NullInt64
ReturnedRows sql.NullInt64
ReturnedBytes sql.NullInt64
TimeInQueueMicroSeconds sql.NullInt64
}

0 comments on commit b243106

Please sign in to comment.