Skip to content

Commit

Permalink
Replace SyncGauge with upstream otel sdk gauges
Browse files Browse the repository at this point in the history
Upstream added gauges in May: open-telemetry/opentelemetry-go#5304
  • Loading branch information
serprex committed Nov 19, 2024
1 parent 61fe601 commit 6e8420d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 119 deletions.
5 changes: 3 additions & 2 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"

"github.com/grafana/pyroscope-go"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/client"
temporalotel "go.temporal.io/sdk/contrib/opentelemetry"
"go.temporal.io/sdk/worker"
Expand Down Expand Up @@ -157,8 +158,8 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
otelManager = &otel_metrics.OtelManager{
MetricsProvider: metricsProvider,
Meter: metricsProvider.Meter("io.peerdb.flow-worker"),
Float64GaugesCache: make(map[string]*otel_metrics.Float64SyncGauge),
Int64GaugesCache: make(map[string]*otel_metrics.Int64SyncGauge),
Float64GaugesCache: make(map[string]metric.Float64Gauge),
Int64GaugesCache: make(map[string]metric.Int64Gauge),
}
cleanupOtelManagerFunc = func() {
shutDownErr := otelManager.MetricsProvider.Shutdown(context.Background())
Expand Down
33 changes: 21 additions & 12 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"

Expand Down Expand Up @@ -1214,11 +1215,12 @@ func (c *PostgresConnector) HandleSlotInfo(
logger.Info(fmt.Sprintf("Checking %s lag for %s", alertKeys.SlotName, alertKeys.PeerName),
slog.Float64("LagInMB", float64(slotInfo[0].LagInMb)))
alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0])
slotMetricGauges.SlotLagGauge.Set(float64(slotInfo[0].LagInMb), attribute.NewSet(
slotMetricGauges.SlotLagGauge.Record(ctx, float64(slotInfo[0].LagInMb), metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())),
))

// Also handles alerts for PeerDB user connections exceeding a given limit here
res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User)
Expand All @@ -1227,21 +1229,25 @@ func (c *PostgresConnector) HandleSlotInfo(
return err
}
alerter.AlertIfOpenConnections(ctx, alertKeys, res)
slotMetricGauges.OpenConnectionsGauge.Set(res.CurrentOpenConnections, attribute.NewSet(
slotMetricGauges.OpenConnectionsGauge.Record(ctx, res.CurrentOpenConnections, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)))

replicationRes, err := getOpenReplicationConnectionsForUser(ctx, c.conn, c.config.User)
if err != nil {
logger.Warn("warning: failed to get current open replication connections", "error", err)
return err
}

slotMetricGauges.OpenReplicationConnectionsGauge.Set(replicationRes.CurrentOpenConnections, attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
slotMetricGauges.OpenReplicationConnectionsGauge.Record(ctx, replicationRes.CurrentOpenConnections,
metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)),
)

var intervalSinceLastNormalize *time.Duration
if err := alerter.CatalogPool.QueryRow(
Expand All @@ -1255,10 +1261,13 @@ func (c *PostgresConnector) HandleSlotInfo(
return nil
}
if intervalSinceLastNormalize != nil {
slotMetricGauges.IntervalSinceLastNormalizeGauge.Set(intervalSinceLastNormalize.Seconds(), attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
slotMetricGauges.IntervalSinceLastNormalizeGauge.Record(ctx, intervalSinceLastNormalize.Seconds(),
metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)),
)
alerter.AlertIfTooLongSinceLastNormalize(ctx, alertKeys, *intervalSinceLastNormalize)
}

Expand Down
4 changes: 2 additions & 2 deletions flow/otel_metrics/otel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
type OtelManager struct {
MetricsProvider *sdkmetric.MeterProvider
Meter metric.Meter
Float64GaugesCache map[string]*Float64SyncGauge
Int64GaugesCache map[string]*Int64SyncGauge
Float64GaugesCache map[string]metric.Float64Gauge
Int64GaugesCache map[string]metric.Int64Gauge
}

// newOtelResource returns a resource describing this application.
Expand Down
10 changes: 6 additions & 4 deletions flow/otel_metrics/peerdb_gauges/gauges.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package peerdb_gauges

import (
"go.opentelemetry.io/otel/metric"

"github.com/PeerDB-io/peer-flow/otel_metrics"
)

Expand All @@ -12,10 +14,10 @@ const (
)

type SlotMetricGauges struct {
SlotLagGauge *otel_metrics.Float64SyncGauge
OpenConnectionsGauge *otel_metrics.Int64SyncGauge
OpenReplicationConnectionsGauge *otel_metrics.Int64SyncGauge
IntervalSinceLastNormalizeGauge *otel_metrics.Float64SyncGauge
SlotLagGauge metric.Float64Gauge
OpenConnectionsGauge metric.Int64Gauge
OpenReplicationConnectionsGauge metric.Int64Gauge
IntervalSinceLastNormalizeGauge metric.Float64Gauge
}

func BuildGaugeName(baseGaugeName string) string {
Expand Down
107 changes: 8 additions & 99 deletions flow/otel_metrics/sync_gauges.go
Original file line number Diff line number Diff line change
@@ -1,121 +1,30 @@
package otel_metrics

import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

type ObservationMapValue[V comparable] struct {
Value V
}

// SyncGauge is a generic synchronous gauge that can be used to observe any type of value
// Inspired from https://github.com/open-telemetry/opentelemetry-go/issues/3984#issuecomment-1743231837
type SyncGauge[V comparable, O metric.Observable] struct {
observableGauge O
observations sync.Map
name string
}

func (a *SyncGauge[V, O]) Callback(ctx context.Context, observeFunc func(value V, options ...metric.ObserveOption)) error {
a.observations.Range(func(key, value interface{}) bool {
attrs := key.(attribute.Set)
val := value.(*ObservationMapValue[V])
observeFunc(val.Value, metric.WithAttributeSet(attrs))
// If the pointer is still same we can safely delete, else it means that the value was overwritten in parallel
a.observations.CompareAndDelete(attrs, val)
return true
})
return nil
}

func (a *SyncGauge[V, O]) Set(input V, attrs attribute.Set) {
val := ObservationMapValue[V]{Value: input}
a.observations.Store(attrs, &val)
}

type Int64SyncGauge struct {
syncGauge *SyncGauge[int64, metric.Int64Observable]
}

func (a *Int64SyncGauge) Set(input int64, attrs attribute.Set) {
if a == nil {
return
}
a.syncGauge.Set(input, attrs)
}

func NewInt64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Int64ObservableGaugeOption) (*Int64SyncGauge, error) {
syncGauge := &SyncGauge[int64, metric.Int64Observable]{
name: gaugeName,
}
observableGauge, err := meter.Int64ObservableGauge(gaugeName,
append(opts, metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error {
return syncGauge.Callback(ctx, func(value int64, options ...metric.ObserveOption) {
observer.Observe(value, options...)
})
}))...)
if err != nil {
return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err)
}
syncGauge.observableGauge = observableGauge
return &Int64SyncGauge{syncGauge: syncGauge}, nil
}

type Float64SyncGauge struct {
syncGauge *SyncGauge[float64, metric.Float64Observable]
}

func (a *Float64SyncGauge) Set(input float64, attrs attribute.Set) {
if a == nil {
return
}
a.syncGauge.Set(input, attrs)
}

func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Float64ObservableGaugeOption) (*Float64SyncGauge, error) {
syncGauge := &SyncGauge[float64, metric.Float64Observable]{
name: gaugeName,
}
observableGauge, err := meter.Float64ObservableGauge(gaugeName,
append(opts, metric.WithFloat64Callback(func(ctx context.Context, observer metric.Float64Observer) error {
return syncGauge.Callback(ctx, func(value float64, options ...metric.ObserveOption) {
observer.Observe(value, options...)
})
}))...)
if err != nil {
return nil, fmt.Errorf("failed to create Float64SyncGauge: %w", err)
}
syncGauge.observableGauge = observableGauge
return &Float64SyncGauge{syncGauge: syncGauge}, nil
}

func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]*Int64SyncGauge, name string,
opts ...metric.Int64ObservableGaugeOption,
) (*Int64SyncGauge, error) {
func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]metric.Int64Gauge, name string, opts ...metric.Int64GaugeOption,
) (metric.Int64Gauge, error) {
gauge, ok := cache[name]
if !ok {
var err error
gauge, err = NewInt64SyncGauge(meter, name, opts...)
gauge, err = meter.Int64Gauge(name, opts...)
if err != nil {
return nil, err
var none metric.Int64Gauge
return none, err
}
cache[name] = gauge
}
return gauge, nil
}

func GetOrInitFloat64SyncGauge(meter metric.Meter, cache map[string]*Float64SyncGauge,
name string, opts ...metric.Float64ObservableGaugeOption,
) (*Float64SyncGauge, error) {
func GetOrInitFloat64SyncGauge(meter metric.Meter, cache map[string]metric.Float64Gauge, name string, opts ...metric.Float64GaugeOption,
) (metric.Float64Gauge, error) {
gauge, ok := cache[name]
if !ok {
var err error
gauge, err = NewFloat64SyncGauge(meter, name, opts...)
gauge, err = meter.Float64Gauge(name, opts...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 6e8420d

Please sign in to comment.