diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 5c16376a12..cca0202ec7 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -10,6 +10,7 @@ import ( "runtime" "github.com/grafana/pyroscope-go" + "go.opentelemetry.io/otel/metric" "go.temporal.io/sdk/client" temporalotel "go.temporal.io/sdk/contrib/opentelemetry" "go.temporal.io/sdk/worker" @@ -157,8 +158,8 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { otelManager = &otel_metrics.OtelManager{ MetricsProvider: metricsProvider, Meter: metricsProvider.Meter("io.peerdb.flow-worker"), - Float64GaugesCache: make(map[string]*otel_metrics.Float64SyncGauge), - Int64GaugesCache: make(map[string]*otel_metrics.Int64SyncGauge), + Float64GaugesCache: make(map[string]metric.Float64Gauge), + Int64GaugesCache: make(map[string]metric.Int64Gauge), } cleanupOtelManagerFunc = func() { shutDownErr := otelManager.MetricsProvider.Shutdown(context.Background()) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 593a94aa49..ae0dbea52d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -17,6 +17,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -1214,11 +1215,12 @@ func (c *PostgresConnector) HandleSlotInfo( logger.Info(fmt.Sprintf("Checking %s lag for %s", alertKeys.SlotName, alertKeys.PeerName), slog.Float64("LagInMB", float64(slotInfo[0].LagInMb))) alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0]) - slotMetricGauges.SlotLagGauge.Set(float64(slotInfo[0].LagInMb), attribute.NewSet( + slotMetricGauges.SlotLagGauge.Record(ctx, float64(slotInfo[0].LagInMb), metric.WithAttributeSet(attribute.NewSet( attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName), - attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())), + )) // Also handles alerts for PeerDB user connections exceeding a given limit here res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User) @@ -1227,10 +1229,11 @@ func (c *PostgresConnector) HandleSlotInfo( return err } alerter.AlertIfOpenConnections(ctx, alertKeys, res) - slotMetricGauges.OpenConnectionsGauge.Set(res.CurrentOpenConnections, attribute.NewSet( + slotMetricGauges.OpenConnectionsGauge.Record(ctx, res.CurrentOpenConnections, metric.WithAttributeSet(attribute.NewSet( attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), - attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()), + ))) replicationRes, err := getOpenReplicationConnectionsForUser(ctx, c.conn, c.config.User) if err != nil { @@ -1238,10 +1241,13 @@ func (c *PostgresConnector) HandleSlotInfo( return err } - slotMetricGauges.OpenReplicationConnectionsGauge.Set(replicationRes.CurrentOpenConnections, attribute.NewSet( - attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), - attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), - attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + slotMetricGauges.OpenReplicationConnectionsGauge.Record(ctx, replicationRes.CurrentOpenConnections, + metric.WithAttributeSet(attribute.NewSet( + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()), + )), + ) var intervalSinceLastNormalize *time.Duration if err := alerter.CatalogPool.QueryRow( @@ -1255,10 +1261,13 @@ func (c *PostgresConnector) HandleSlotInfo( return nil } if intervalSinceLastNormalize != nil { - slotMetricGauges.IntervalSinceLastNormalizeGauge.Set(intervalSinceLastNormalize.Seconds(), attribute.NewSet( - attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), - attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), - attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + slotMetricGauges.IntervalSinceLastNormalizeGauge.Record(ctx, intervalSinceLastNormalize.Seconds(), + metric.WithAttributeSet(attribute.NewSet( + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()), + )), + ) alerter.AlertIfTooLongSinceLastNormalize(ctx, alertKeys, *intervalSinceLastNormalize) } diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index becf13a16f..c59adecd41 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -20,8 +20,8 @@ import ( type OtelManager struct { MetricsProvider *sdkmetric.MeterProvider Meter metric.Meter - Float64GaugesCache map[string]*Float64SyncGauge - Int64GaugesCache map[string]*Int64SyncGauge + Float64GaugesCache map[string]metric.Float64Gauge + Int64GaugesCache map[string]metric.Int64Gauge } // newOtelResource returns a resource describing this application. diff --git a/flow/otel_metrics/peerdb_gauges/gauges.go b/flow/otel_metrics/peerdb_gauges/gauges.go index 767aac0945..a3b7d5c3e8 100644 --- a/flow/otel_metrics/peerdb_gauges/gauges.go +++ b/flow/otel_metrics/peerdb_gauges/gauges.go @@ -1,6 +1,8 @@ package peerdb_gauges import ( + "go.opentelemetry.io/otel/metric" + "github.com/PeerDB-io/peer-flow/otel_metrics" ) @@ -12,10 +14,10 @@ const ( ) type SlotMetricGauges struct { - SlotLagGauge *otel_metrics.Float64SyncGauge - OpenConnectionsGauge *otel_metrics.Int64SyncGauge - OpenReplicationConnectionsGauge *otel_metrics.Int64SyncGauge - IntervalSinceLastNormalizeGauge *otel_metrics.Float64SyncGauge + SlotLagGauge metric.Float64Gauge + OpenConnectionsGauge metric.Int64Gauge + OpenReplicationConnectionsGauge metric.Int64Gauge + IntervalSinceLastNormalizeGauge metric.Float64Gauge } func BuildGaugeName(baseGaugeName string) string { diff --git a/flow/otel_metrics/sync_gauges.go b/flow/otel_metrics/sync_gauges.go index d2ef4924c1..e9da02c875 100644 --- a/flow/otel_metrics/sync_gauges.go +++ b/flow/otel_metrics/sync_gauges.go @@ -1,106 +1,15 @@ package otel_metrics import ( - "context" - "fmt" - "sync" - - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) -type ObservationMapValue[V comparable] struct { - Value V -} - -// SyncGauge is a generic synchronous gauge that can be used to observe any type of value -// Inspired from https://github.com/open-telemetry/opentelemetry-go/issues/3984#issuecomment-1743231837 -type SyncGauge[V comparable, O metric.Observable] struct { - observableGauge O - observations sync.Map - name string -} - -func (a *SyncGauge[V, O]) Callback(ctx context.Context, observeFunc func(value V, options ...metric.ObserveOption)) error { - a.observations.Range(func(key, value interface{}) bool { - attrs := key.(attribute.Set) - val := value.(*ObservationMapValue[V]) - observeFunc(val.Value, metric.WithAttributeSet(attrs)) - // If the pointer is still same we can safely delete, else it means that the value was overwritten in parallel - a.observations.CompareAndDelete(attrs, val) - return true - }) - return nil -} - -func (a *SyncGauge[V, O]) Set(input V, attrs attribute.Set) { - val := ObservationMapValue[V]{Value: input} - a.observations.Store(attrs, &val) -} - -type Int64SyncGauge struct { - syncGauge *SyncGauge[int64, metric.Int64Observable] -} - -func (a *Int64SyncGauge) Set(input int64, attrs attribute.Set) { - if a == nil { - return - } - a.syncGauge.Set(input, attrs) -} - -func NewInt64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Int64ObservableGaugeOption) (*Int64SyncGauge, error) { - syncGauge := &SyncGauge[int64, metric.Int64Observable]{ - name: gaugeName, - } - observableGauge, err := meter.Int64ObservableGauge(gaugeName, - append(opts, metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error { - return syncGauge.Callback(ctx, func(value int64, options ...metric.ObserveOption) { - observer.Observe(value, options...) - }) - }))...) - if err != nil { - return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err) - } - syncGauge.observableGauge = observableGauge - return &Int64SyncGauge{syncGauge: syncGauge}, nil -} - -type Float64SyncGauge struct { - syncGauge *SyncGauge[float64, metric.Float64Observable] -} - -func (a *Float64SyncGauge) Set(input float64, attrs attribute.Set) { - if a == nil { - return - } - a.syncGauge.Set(input, attrs) -} - -func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Float64ObservableGaugeOption) (*Float64SyncGauge, error) { - syncGauge := &SyncGauge[float64, metric.Float64Observable]{ - name: gaugeName, - } - observableGauge, err := meter.Float64ObservableGauge(gaugeName, - append(opts, metric.WithFloat64Callback(func(ctx context.Context, observer metric.Float64Observer) error { - return syncGauge.Callback(ctx, func(value float64, options ...metric.ObserveOption) { - observer.Observe(value, options...) - }) - }))...) - if err != nil { - return nil, fmt.Errorf("failed to create Float64SyncGauge: %w", err) - } - syncGauge.observableGauge = observableGauge - return &Float64SyncGauge{syncGauge: syncGauge}, nil -} - -func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]*Int64SyncGauge, name string, - opts ...metric.Int64ObservableGaugeOption, -) (*Int64SyncGauge, error) { +func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]metric.Int64Gauge, name string, opts ...metric.Int64GaugeOption, +) (metric.Int64Gauge, error) { gauge, ok := cache[name] if !ok { var err error - gauge, err = NewInt64SyncGauge(meter, name, opts...) + gauge, err = meter.Int64Gauge(name, opts...) if err != nil { return nil, err } @@ -109,13 +18,12 @@ func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]*Int64SyncGaug return gauge, nil } -func GetOrInitFloat64SyncGauge(meter metric.Meter, cache map[string]*Float64SyncGauge, - name string, opts ...metric.Float64ObservableGaugeOption, -) (*Float64SyncGauge, error) { +func GetOrInitFloat64SyncGauge(meter metric.Meter, cache map[string]metric.Float64Gauge, name string, opts ...metric.Float64GaugeOption, +) (metric.Float64Gauge, error) { gauge, ok := cache[name] if !ok { var err error - gauge, err = NewFloat64SyncGauge(meter, name, opts...) + gauge, err = meter.Float64Gauge(name, opts...) if err != nil { return nil, err }