Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace SyncGauge with upstream otel sdk gauges #2272

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"

"github.com/grafana/pyroscope-go"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/client"
temporalotel "go.temporal.io/sdk/contrib/opentelemetry"
"go.temporal.io/sdk/worker"
Expand Down Expand Up @@ -157,8 +158,8 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
otelManager = &otel_metrics.OtelManager{
MetricsProvider: metricsProvider,
Meter: metricsProvider.Meter("io.peerdb.flow-worker"),
Float64GaugesCache: make(map[string]*otel_metrics.Float64SyncGauge),
Int64GaugesCache: make(map[string]*otel_metrics.Int64SyncGauge),
Float64GaugesCache: make(map[string]metric.Float64Gauge),
Int64GaugesCache: make(map[string]metric.Int64Gauge),
}
cleanupOtelManagerFunc = func() {
shutDownErr := otelManager.MetricsProvider.Shutdown(context.Background())
Expand Down
33 changes: 21 additions & 12 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"

Expand Down Expand Up @@ -1214,11 +1215,12 @@ func (c *PostgresConnector) HandleSlotInfo(
logger.Info(fmt.Sprintf("Checking %s lag for %s", alertKeys.SlotName, alertKeys.PeerName),
slog.Float64("LagInMB", float64(slotInfo[0].LagInMb)))
alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0])
slotMetricGauges.SlotLagGauge.Set(float64(slotInfo[0].LagInMb), attribute.NewSet(
slotMetricGauges.SlotLagGauge.Record(ctx, float64(slotInfo[0].LagInMb), metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())),
))

// Also handles alerts for PeerDB user connections exceeding a given limit here
res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User)
Expand All @@ -1227,21 +1229,25 @@ func (c *PostgresConnector) HandleSlotInfo(
return err
}
alerter.AlertIfOpenConnections(ctx, alertKeys, res)
slotMetricGauges.OpenConnectionsGauge.Set(res.CurrentOpenConnections, attribute.NewSet(
slotMetricGauges.OpenConnectionsGauge.Record(ctx, res.CurrentOpenConnections, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)))

replicationRes, err := getOpenReplicationConnectionsForUser(ctx, c.conn, c.config.User)
if err != nil {
logger.Warn("warning: failed to get current open replication connections", "error", err)
return err
}

slotMetricGauges.OpenReplicationConnectionsGauge.Set(replicationRes.CurrentOpenConnections, attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
slotMetricGauges.OpenReplicationConnectionsGauge.Record(ctx, replicationRes.CurrentOpenConnections,
metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)),
)

var intervalSinceLastNormalize *time.Duration
if err := alerter.CatalogPool.QueryRow(
Expand All @@ -1255,10 +1261,13 @@ func (c *PostgresConnector) HandleSlotInfo(
return nil
}
if intervalSinceLastNormalize != nil {
slotMetricGauges.IntervalSinceLastNormalizeGauge.Set(intervalSinceLastNormalize.Seconds(), attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
slotMetricGauges.IntervalSinceLastNormalizeGauge.Record(ctx, intervalSinceLastNormalize.Seconds(),
metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)),
)
alerter.AlertIfTooLongSinceLastNormalize(ctx, alertKeys, *intervalSinceLastNormalize)
}

Expand Down
4 changes: 2 additions & 2 deletions flow/otel_metrics/otel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
type OtelManager struct {
MetricsProvider *sdkmetric.MeterProvider
Meter metric.Meter
Float64GaugesCache map[string]*Float64SyncGauge
Int64GaugesCache map[string]*Int64SyncGauge
Float64GaugesCache map[string]metric.Float64Gauge
Int64GaugesCache map[string]metric.Int64Gauge
}

// newOtelResource returns a resource describing this application.
Expand Down
10 changes: 6 additions & 4 deletions flow/otel_metrics/peerdb_gauges/gauges.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package peerdb_gauges

import (
"go.opentelemetry.io/otel/metric"

"github.com/PeerDB-io/peer-flow/otel_metrics"
)

Expand All @@ -12,10 +14,10 @@ const (
)

type SlotMetricGauges struct {
SlotLagGauge *otel_metrics.Float64SyncGauge
OpenConnectionsGauge *otel_metrics.Int64SyncGauge
OpenReplicationConnectionsGauge *otel_metrics.Int64SyncGauge
IntervalSinceLastNormalizeGauge *otel_metrics.Float64SyncGauge
SlotLagGauge metric.Float64Gauge
OpenConnectionsGauge metric.Int64Gauge
OpenReplicationConnectionsGauge metric.Int64Gauge
IntervalSinceLastNormalizeGauge metric.Float64Gauge
}

func BuildGaugeName(baseGaugeName string) string {
Expand Down
104 changes: 6 additions & 98 deletions flow/otel_metrics/sync_gauges.go
Original file line number Diff line number Diff line change
@@ -1,106 +1,15 @@
package otel_metrics

import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

type ObservationMapValue[V comparable] struct {
Value V
}

// SyncGauge is a generic synchronous gauge that can be used to observe any type of value
// Inspired from https://github.com/open-telemetry/opentelemetry-go/issues/3984#issuecomment-1743231837
type SyncGauge[V comparable, O metric.Observable] struct {
observableGauge O
observations sync.Map
name string
}

func (a *SyncGauge[V, O]) Callback(ctx context.Context, observeFunc func(value V, options ...metric.ObserveOption)) error {
a.observations.Range(func(key, value interface{}) bool {
attrs := key.(attribute.Set)
val := value.(*ObservationMapValue[V])
observeFunc(val.Value, metric.WithAttributeSet(attrs))
// If the pointer is still same we can safely delete, else it means that the value was overwritten in parallel
a.observations.CompareAndDelete(attrs, val)
return true
})
return nil
}

func (a *SyncGauge[V, O]) Set(input V, attrs attribute.Set) {
val := ObservationMapValue[V]{Value: input}
a.observations.Store(attrs, &val)
}

type Int64SyncGauge struct {
syncGauge *SyncGauge[int64, metric.Int64Observable]
}

func (a *Int64SyncGauge) Set(input int64, attrs attribute.Set) {
if a == nil {
return
}
a.syncGauge.Set(input, attrs)
}

func NewInt64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Int64ObservableGaugeOption) (*Int64SyncGauge, error) {
syncGauge := &SyncGauge[int64, metric.Int64Observable]{
name: gaugeName,
}
observableGauge, err := meter.Int64ObservableGauge(gaugeName,
append(opts, metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error {
return syncGauge.Callback(ctx, func(value int64, options ...metric.ObserveOption) {
observer.Observe(value, options...)
})
}))...)
if err != nil {
return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err)
}
syncGauge.observableGauge = observableGauge
return &Int64SyncGauge{syncGauge: syncGauge}, nil
}

type Float64SyncGauge struct {
syncGauge *SyncGauge[float64, metric.Float64Observable]
}

func (a *Float64SyncGauge) Set(input float64, attrs attribute.Set) {
if a == nil {
return
}
a.syncGauge.Set(input, attrs)
}

func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Float64ObservableGaugeOption) (*Float64SyncGauge, error) {
syncGauge := &SyncGauge[float64, metric.Float64Observable]{
name: gaugeName,
}
observableGauge, err := meter.Float64ObservableGauge(gaugeName,
append(opts, metric.WithFloat64Callback(func(ctx context.Context, observer metric.Float64Observer) error {
return syncGauge.Callback(ctx, func(value float64, options ...metric.ObserveOption) {
observer.Observe(value, options...)
})
}))...)
if err != nil {
return nil, fmt.Errorf("failed to create Float64SyncGauge: %w", err)
}
syncGauge.observableGauge = observableGauge
return &Float64SyncGauge{syncGauge: syncGauge}, nil
}

func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]*Int64SyncGauge, name string,
opts ...metric.Int64ObservableGaugeOption,
) (*Int64SyncGauge, error) {
func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]metric.Int64Gauge, name string, opts ...metric.Int64GaugeOption,
) (metric.Int64Gauge, error) {
gauge, ok := cache[name]
if !ok {
var err error
gauge, err = NewInt64SyncGauge(meter, name, opts...)
gauge, err = meter.Int64Gauge(name, opts...)
if err != nil {
return nil, err
}
Expand All @@ -109,13 +18,12 @@ func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]*Int64SyncGaug
return gauge, nil
}

func GetOrInitFloat64SyncGauge(meter metric.Meter, cache map[string]*Float64SyncGauge,
name string, opts ...metric.Float64ObservableGaugeOption,
) (*Float64SyncGauge, error) {
func GetOrInitFloat64SyncGauge(meter metric.Meter, cache map[string]metric.Float64Gauge, name string, opts ...metric.Float64GaugeOption,
) (metric.Float64Gauge, error) {
gauge, ok := cache[name]
if !ok {
var err error
gauge, err = NewFloat64SyncGauge(meter, name, opts...)
gauge, err = meter.Float64Gauge(name, opts...)
if err != nil {
return nil, err
}
Expand Down
Loading