diff --git a/internal/collector/collect.go b/internal/collector/collect.go index a28f5e1..8d03f0a 100644 --- a/internal/collector/collect.go +++ b/internal/collector/collect.go @@ -93,13 +93,13 @@ func (c *collector) collectRuntimeMetrics(ctx context.Context, wg *sync.WaitGrou attrsSet := attribute.NewSet(attrs...) - c.runtimeMetrics.cpuUtilization.Record(ctx, mp.CPUUsed, api.WithAttributeSet(attrsSet)) - c.runtimeMetrics.memoryUtilization.Record(ctx, mp.MemoryUsed, api.WithAttributeSet(attrsSet)) - c.runtimeMetrics.diskUtilization.Record(ctx, mp.DiskUsed, api.WithAttributeSet(attrsSet)) - c.runtimeMetrics.cacheUtilization.Record(ctx, mp.CacheHitRatio, api.WithAttributeSet(attrsSet)) - c.runtimeMetrics.diskSpilled.Add(ctx, mp.SpilledBytes, api.WithAttributeSet(attrsSet)) - c.runtimeMetrics.runningQueries.Record(ctx, mp.RunningQueries, api.WithAttributeSet(attrsSet)) - c.runtimeMetrics.suspendedQueries.Record(ctx, mp.SuspendedQueries, api.WithAttributeSet(attrsSet)) + c.runtimeMetrics.cpuUtilization.Record(ctx, mp.CPUUsed.Float64, api.WithAttributeSet(attrsSet)) + c.runtimeMetrics.memoryUtilization.Record(ctx, mp.MemoryUsed.Float64, api.WithAttributeSet(attrsSet)) + c.runtimeMetrics.diskUtilization.Record(ctx, mp.DiskUsed.Float64, api.WithAttributeSet(attrsSet)) + c.runtimeMetrics.cacheUtilization.Record(ctx, mp.CacheHitRatio.Float64, api.WithAttributeSet(attrsSet)) + c.runtimeMetrics.diskSpilled.Add(ctx, mp.SpilledBytes.Int64, api.WithAttributeSet(attrsSet)) + c.runtimeMetrics.runningQueries.Record(ctx, mp.RunningQueries.Int64, api.WithAttributeSet(attrsSet)) + c.runtimeMetrics.suspendedQueries.Record(ctx, mp.SuspendedQueries.Int64, api.WithAttributeSet(attrsSet)) } wg.Done() @@ -117,21 +117,21 @@ func (c *collector) collectQueryHistoryMetrics(ctx context.Context, wg *sync.Wai attrs := []attribute.KeyValue{ attribute.Key("firebolt.account.name").String(accountName), attribute.Key("firebolt.engine.name").String(mp.EngineName), - attribute.Key("firebolt.user.name").String(mp.UserName), - attribute.Key("firebolt.query.status").String(mp.Status), + attribute.Key("firebolt.user.name").String(mp.UserName.String), + attribute.Key("firebolt.query.status").String(mp.Status.String), } attrsSet := attribute.NewSet(attrs...) - c.queryHistoryMetrics.queryDuration.Record(ctx, float64(mp.DurationMicroSeconds)/1000000, api.WithAttributeSet(attrsSet)) - c.queryHistoryMetrics.scannedRows.Add(ctx, mp.ScannedRows, api.WithAttributeSet(attrsSet)) - c.queryHistoryMetrics.scannedBytes.Add(ctx, mp.ScannedBytes, api.WithAttributeSet(attrsSet)) - c.queryHistoryMetrics.insertedRows.Add(ctx, mp.InsertedRows, api.WithAttributeSet(attrsSet)) - c.queryHistoryMetrics.insertedBytes.Add(ctx, mp.InsertedBytes, api.WithAttributeSet(attrsSet)) - c.queryHistoryMetrics.returnedRows.Add(ctx, mp.ReturnedRows, api.WithAttributeSet(attrsSet)) - c.queryHistoryMetrics.returnedBytes.Add(ctx, mp.ReturnedBytes, api.WithAttributeSet(attrsSet)) - c.queryHistoryMetrics.spilledBytes.Add(ctx, mp.SpilledBytes, api.WithAttributeSet(attrsSet)) - c.queryHistoryMetrics.queueTime.Add(ctx, float64(mp.TimeInQueueMicroSeconds)/1000000, api.WithAttributeSet(attrsSet)) + c.queryHistoryMetrics.queryDuration.Record(ctx, float64(mp.DurationMicroSeconds.Int64)/1000000, api.WithAttributeSet(attrsSet)) + c.queryHistoryMetrics.scannedRows.Add(ctx, mp.ScannedRows.Int64, api.WithAttributeSet(attrsSet)) + c.queryHistoryMetrics.scannedBytes.Add(ctx, mp.ScannedBytes.Int64, api.WithAttributeSet(attrsSet)) + c.queryHistoryMetrics.insertedRows.Add(ctx, mp.InsertedRows.Int64, api.WithAttributeSet(attrsSet)) + c.queryHistoryMetrics.insertedBytes.Add(ctx, mp.InsertedBytes.Int64, api.WithAttributeSet(attrsSet)) + c.queryHistoryMetrics.returnedRows.Add(ctx, mp.ReturnedRows.Int64, api.WithAttributeSet(attrsSet)) + c.queryHistoryMetrics.returnedBytes.Add(ctx, mp.ReturnedBytes.Int64, api.WithAttributeSet(attrsSet)) + c.queryHistoryMetrics.spilledBytes.Add(ctx, mp.SpilledBytes.Int64, api.WithAttributeSet(attrsSet)) + c.queryHistoryMetrics.queueTime.Add(ctx, float64(mp.TimeInQueueMicroSeconds.Int64)/1000000, api.WithAttributeSet(attrsSet)) } wg.Done() diff --git a/internal/collector/collect_test.go b/internal/collector/collect_test.go index 6186706..b935b94 100644 --- a/internal/collector/collect_test.go +++ b/internal/collector/collect_test.go @@ -2,6 +2,7 @@ package collector import ( "context" + "database/sql" "sync/atomic" "testing" "time" @@ -64,13 +65,13 @@ func Test_Collector_Start(t *testing.T) { go func() { rCh <- fetcher.EngineRuntimePoint{ EngineName: "eng1", - EventTime: time.Now(), - CPUUsed: 10, + EventTime: sql.Null[time.Time]{Valid: true, V: time.Now()}, + CPUUsed: sql.NullFloat64{Valid: true, Float64: 10}, } qhCh <- fetcher.QueryHistoryPoint{ EngineName: "eng2", - DurationMicroSeconds: 10, + DurationMicroSeconds: sql.NullInt64{Valid: true, Int64: 10}, } sentCh <- struct{}{} }() diff --git a/internal/fetcher/fetcher.go b/internal/fetcher/fetcher.go index 1c8ba06..affdb4b 100644 --- a/internal/fetcher/fetcher.go +++ b/internal/fetcher/fetcher.go @@ -185,9 +185,7 @@ func (f *fetcher) FetchQueryHistoryPoints(ctx context.Context, account string, e for rows.Next() { qhp := QueryHistoryPoint{EngineName: engineName} - userName, accountName := sql.NullString{}, sql.NullString{} - - if err := rows.Scan(&accountName, &userName, &qhp.DurationMicroSeconds, &qhp.Status, + if err := rows.Scan(&qhp.AccountName, &qhp.UserName, &qhp.DurationMicroSeconds, &qhp.Status, &qhp.ScannedRows, &qhp.ScannedBytes, &qhp.InsertedRows, &qhp.InsertedBytes, &qhp.SpilledBytes, &qhp.ReturnedRows, &qhp.ReturnedBytes, &qhp.TimeInQueueMicroSeconds, ); err != nil { @@ -198,13 +196,6 @@ func (f *fetcher) FetchQueryHistoryPoints(ctx context.Context, account string, e return } - if userName.Valid { - qhp.UserName = userName.String - } - if accountName.Valid { - qhp.AccountName = accountName.String - } - ch <- qhp } }(engine) diff --git a/internal/fetcher/model.go b/internal/fetcher/model.go index 93f8e74..795ecfd 100644 --- a/internal/fetcher/model.go +++ b/internal/fetcher/model.go @@ -9,15 +9,15 @@ import ( type EngineRuntimePoint struct { EngineName string - EngineCluster string - EventTime time.Time - CPUUsed float64 - MemoryUsed float64 - DiskUsed float64 - CacheHitRatio float64 - SpilledBytes int64 - RunningQueries int64 - SuspendedQueries int64 + EngineCluster sql.NullString + EventTime sql.Null[time.Time] + CPUUsed sql.NullFloat64 + MemoryUsed sql.NullFloat64 + DiskUsed sql.NullFloat64 + CacheHitRatio sql.NullFloat64 + SpilledBytes sql.NullInt64 + RunningQueries sql.NullInt64 + SuspendedQueries sql.NullInt64 } // Scan fills in EngineRuntimePoint fields from a single row. @@ -39,18 +39,18 @@ func (p *EngineRuntimePoint) Scan(row *sql.Row) error { type QueryHistoryPoint struct { EngineName string - AccountName string - UserName string + AccountName sql.NullString + UserName sql.NullString - DurationMicroSeconds int64 - Status string + DurationMicroSeconds sql.NullInt64 + Status sql.NullString - ScannedRows int64 - ScannedBytes int64 - InsertedRows int64 - InsertedBytes int64 - SpilledBytes int64 - ReturnedRows int64 - ReturnedBytes int64 - TimeInQueueMicroSeconds int64 + ScannedRows sql.NullInt64 + ScannedBytes sql.NullInt64 + InsertedRows sql.NullInt64 + InsertedBytes sql.NullInt64 + SpilledBytes sql.NullInt64 + ReturnedRows sql.NullInt64 + ReturnedBytes sql.NullInt64 + TimeInQueueMicroSeconds sql.NullInt64 }