diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 6834402b3b..e9dde4487c 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -26,6 +26,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/instrumentation/tracing" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_guages" @@ -55,6 +56,8 @@ func (a *FlowableActivity) CheckConnection( ctx context.Context, config *protos.SetupInput, ) (*CheckConnectionResult, error) { + ctx, span := tracing.Tracer().Start(ctx, "CheckConnection") + defer span.End() ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer) if err != nil { @@ -71,6 +74,8 @@ func (a *FlowableActivity) CheckConnection( } func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.SetupInput) error { + ctx, span := tracing.Tracer().Start(ctx, "SetupMetadataTables") + defer span.End() ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer) if err != nil { @@ -90,6 +95,8 @@ func (a *FlowableActivity) EnsurePullability( ctx context.Context, config *protos.EnsurePullabilityBatchInput, ) (*protos.EnsurePullabilityBatchOutput, error) { + ctx, span := tracing.Tracer().Start(ctx, "EnsurePullability") + defer span.End() ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) srcConn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { @@ -274,6 +281,8 @@ func (a *FlowableActivity) SyncRecords( options *protos.SyncFlowOptions, sessionID string, ) (*model.SyncResponse, error) { + ctx, span := tracing.Tracer().Start(ctx, "SyncRecords") + defer span.End() return syncCore(ctx, a, config, options, sessionID, connectors.CDCPullConnector.PullRecords, connectors.CDCSyncConnector.SyncRecords) diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 5b010916db..94e8dafc35 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -89,9 +89,10 @@ func killExistingScheduleFlows( func APIMain(ctx context.Context, args *APIServerParams) error { clientOptions := client.Options{ - HostPort: args.TemporalHostPort, - Namespace: args.TemporalNamespace, - Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))), + HostPort: args.TemporalHostPort, + Namespace: args.TemporalNamespace, + Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))), + Interceptors: GetTemporalClientInterceptors(), } if args.TemporalCert != "" && args.TemporalKey != "" { slog.Info("Using temporal certificate/key for authentication") diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index ee49dbc039..912901499a 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -27,9 +27,10 @@ type SnapshotWorkerOptions struct { func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Worker, error) { clientOptions := client.Options{ - HostPort: opts.TemporalHostPort, - Namespace: opts.TemporalNamespace, - Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))), + HostPort: opts.TemporalHostPort, + Namespace: opts.TemporalNamespace, + Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))), + Interceptors: GetTemporalClientInterceptors(), } if opts.TemporalCert != "" && opts.TemporalKey != "" { diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 9ddeb9306e..fba3e400a0 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -87,9 +87,10 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { } clientOptions := client.Options{ - HostPort: opts.TemporalHostPort, - Namespace: opts.TemporalNamespace, - Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))), + HostPort: opts.TemporalHostPort, + Namespace: opts.TemporalNamespace, + Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))), + Interceptors: GetTemporalClientInterceptors(), } if opts.TemporalCert != "" && opts.TemporalKey != "" { @@ -117,7 +118,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { return nil, fmt.Errorf("unable to create Temporal client: %w", err) } slog.Info("Created temporal client") - taskQueue := peerdbenv.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue) slog.Info( fmt.Sprintf("Creating temporal worker for queue %v: %v workflow workers %v activity workers", diff --git a/flow/cmd/worker_common.go b/flow/cmd/worker_common.go new file mode 100644 index 0000000000..6c4914dca1 --- /dev/null +++ b/flow/cmd/worker_common.go @@ -0,0 +1,18 @@ +package cmd + +import ( + "log" + + "go.temporal.io/sdk/contrib/opentelemetry" + "go.temporal.io/sdk/interceptor" +) + +func GetTemporalClientInterceptors() []interceptor.ClientInterceptor { + // Maybe do this only when otel is actually enabled? + tracingInterceptor, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{}) + if err != nil { + log.Printf("failed to create tracing interceptor: %v\n", err) + return nil + } + return []interceptor.ClientInterceptor{tracingInterceptor} +} diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 3d09e82fe7..c73f4cb76e 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -6,6 +6,7 @@ import ( "log/slog" "github.com/jackc/pgx/v5/pgxpool" + "go.opencensus.io/trace" "github.com/PeerDB-io/peer-flow/alerting" connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" @@ -267,6 +268,8 @@ func GetAs[T Connector](ctx context.Context, config *protos.Peer) (T, error) { } func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConnector, error) { + ctx, span := trace.StartSpan(ctx, "connectors.GetCDCPullConnector") + defer span.End() return GetAs[CDCPullConnector](ctx, config) } diff --git a/flow/go.mod b/flow/go.mod index 380fd5ddb1..64ddfaf764 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -53,11 +53,16 @@ require ( go.opentelemetry.io/otel v1.26.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 go.opentelemetry.io/otel/metric v1.26.0 go.opentelemetry.io/otel/sdk v1.26.0 go.opentelemetry.io/otel/sdk/metric v1.26.0 + go.opentelemetry.io/otel/trace v1.26.0 go.temporal.io/api v1.33.0 go.temporal.io/sdk v1.26.1 + go.temporal.io/sdk/contrib/opentelemetry v0.5.0 go.uber.org/automaxprocs v1.5.3 golang.org/x/crypto v0.23.0 golang.org/x/mod v0.17.0 @@ -112,12 +117,11 @@ require ( github.com/prometheus/common v0.53.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect - github.com/segmentio/asm v1.2.0 // indirect + github.com/segmentio/asm v1.2.0 // indirect; indirectianian github.com/sirupsen/logrus v1.9.3 // indirect github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect - go.opentelemetry.io/otel/trace v1.26.0 // indirect go.opentelemetry.io/proto/otlp v1.2.0 // indirect golang.org/x/term v0.20.0 // indirect ) @@ -173,7 +177,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - go.opencensus.io v0.24.0 // indirect + go.opencensus.io v0.24.0 golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 golang.org/x/net v0.25.0 // indirect golang.org/x/oauth2 v0.20.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index 5fba5c2587..b7d1251f54 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -429,6 +429,12 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0 h1:+hm go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0/go.mod h1:NjC8142mLvvNT6biDpaMjyz78kyEHIwAJlSX0N9P5KI= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0 h1:HGZWGmCVRCVyAs2GQaiHQPbDHo+ObFWeUEOd+zDnp64= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0/go.mod h1:SaH+v38LSCHddyk7RGlU9uZyQoRrKao6IBnJw6Kbn+c= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 h1:1u/AyyOqAWzy+SkPxDpahCNZParHV8Vid1RnI2clyDE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0/go.mod h1:z46paqbJ9l7c9fIPCXTqTGwhQZ5XoTIsfeFYWboizjs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0 h1:Waw9Wfpo/IXzOI8bCB7DIk+0JZcqqsyn1JFnAc+iam8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0/go.mod h1:wnJIG4fOqyynOnnQF/eQb4/16VlX2EJAHhHgqIqWfAo= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 h1:1wp/gyxsuYtuE/JFxsQRtcCDtMrO2qMvlfXALU5wkzI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0/go.mod h1:gbTHmghkGgqxMomVQQMur1Nba4M0MQ8AYThXDUjsJ38= go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30= go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4= go.opentelemetry.io/otel/sdk v1.26.0 h1:Y7bumHf5tAiDlRYFmGqetNcLaVUZmh4iYfmGxtmz7F8= @@ -443,10 +449,14 @@ go.temporal.io/api v1.33.0 h1:UdvRZAwXcR8WMY6nmQ+kDqKMmYPSisIVkoAPX1r+wkM= go.temporal.io/api v1.33.0/go.mod h1:fgj/okGTIJVbheu3wSJjfEsSOcH/LzihaETvogVDh/c= go.temporal.io/sdk v1.26.1 h1:ggmFBythnuuW3yQRp0VzOTrmbOf+Ddbe00TZl+CQ+6U= go.temporal.io/sdk v1.26.1/go.mod h1:ph3K/74cry+JuSV9nJH+Q+Zeir2ddzoX2LjWL/e5yCo= +go.temporal.io/sdk/contrib/opentelemetry v0.5.0 h1:SOcS5VD7lWU+zwtY9PITn5nXLlSywgVzl5A7kWwQ6kI= +go.temporal.io/sdk/contrib/opentelemetry v0.5.0/go.mod h1:zJF/95YTBlTnsnMHLKiZzMFN76LnuTTGC7juBS7NeBY= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/flow/instrumentation/otel_common/resource.go b/flow/instrumentation/otel_common/resource.go new file mode 100644 index 0000000000..4b51759deb --- /dev/null +++ b/flow/instrumentation/otel_common/resource.go @@ -0,0 +1,17 @@ +package otel_common + +import ( + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +// NewOtelResource returns a resource describing this application. +func NewOtelResource(otelServiceName string) (*resource.Resource, error) { + return resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(otelServiceName), + ), + ) +} diff --git a/flow/instrumentation/setup.go b/flow/instrumentation/setup.go new file mode 100644 index 0000000000..8c879c24a6 --- /dev/null +++ b/flow/instrumentation/setup.go @@ -0,0 +1,34 @@ +package instrumentation + +import ( + "context" + "errors" + + "github.com/PeerDB-io/peer-flow/instrumentation/tracing" +) + +type Config struct { + EnableTracing bool +} + +func SetupInstrumentation(ctx context.Context, serviceName string, config Config) (func(ctx context.Context) error, error) { + var shutdownFuncs []func(context.Context) error + shutdown := func(ctx context.Context) error { + var err error + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + shutdownFuncs = nil + return err + } + + if config.EnableTracing { + traceShutdown, err := tracing.SetupOtelTraceProviderExporter(serviceName) + if err != nil { + return shutdown, errors.Join(err, shutdown(ctx)) + } + shutdownFuncs = append(shutdownFuncs, traceShutdown) + } + // Setup other stuff here in the future like metrics, logs etc + return shutdown, nil +} diff --git a/flow/instrumentation/tracing/setup.go b/flow/instrumentation/tracing/setup.go new file mode 100644 index 0000000000..01850312f2 --- /dev/null +++ b/flow/instrumentation/tracing/setup.go @@ -0,0 +1,76 @@ +package tracing + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + + "github.com/PeerDB-io/peer-flow/instrumentation/otel_common" + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +// PeerDBSpanProcessor adds PeerDB specific attributes to spans like deploymentUID +type PeerDBSpanProcessor struct{} + +func (p *PeerDBSpanProcessor) OnStart(parent context.Context, s sdktrace.ReadWriteSpan) { + s.SetAttributes(attribute.String("deploymentUID", peerdbenv.PeerDBDeploymentUID())) +} + +func (p *PeerDBSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) { +} + +func (p *PeerDBSpanProcessor) Shutdown(ctx context.Context) error { + return nil +} + +func (p *PeerDBSpanProcessor) ForceFlush(ctx context.Context) error { + return nil +} + +func NewPeerDBSpanProcessor() sdktrace.SpanProcessor { + return &PeerDBSpanProcessor{} +} + +func setupHttpOtelTraceExporter() (*otlptrace.Exporter, error) { + return otlptracehttp.New(context.Background()) +} + +func setupGrpcOtelTraceExporter() (*otlptrace.Exporter, error) { + return otlptracegrpc.New(context.Background()) +} + +func SetupOtelTraceProviderExporter(otelServiceName string) (func(ctx context.Context) error, error) { + otlpTraceProtocol := peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_PROTOCOL", + peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "http/protobuf")) + var traceExporter *otlptrace.Exporter + var err error + switch otlpTraceProtocol { + case "http/protobuf": + traceExporter, err = setupHttpOtelTraceExporter() + case "grpc": + traceExporter, err = setupGrpcOtelTraceExporter() + default: + return nil, fmt.Errorf("unsupported otel trace protocol: %s", otlpTraceProtocol) + } + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry trace exporter: %w", err) + } + otelResource, err := otel_common.NewOtelResource(otelServiceName) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(traceExporter), + sdktrace.WithResource(otelResource), + ) + // This sets up the trace provider globally, now a tracer can be retrieved via tracer.Tracer() + otel.SetTracerProvider(tracerProvider) + tracerProvider.RegisterSpanProcessor(NewPeerDBSpanProcessor()) + return traceExporter.Shutdown, nil +} diff --git a/flow/instrumentation/tracing/tracer.go b/flow/instrumentation/tracing/tracer.go new file mode 100644 index 0000000000..79d5dd7d67 --- /dev/null +++ b/flow/instrumentation/tracing/tracer.go @@ -0,0 +1,26 @@ +package tracing + +import ( + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +var tracerName = "peerdb" + +var globalTracerOnce sync.Once + +var tracer trace.Tracer + +func SetupTracer(name string) trace.Tracer { + globalTracerOnce.Do(func() { + tracerName = name + tracer = otel.GetTracerProvider().Tracer(tracerName) + }) + return tracer +} + +func Tracer() trace.Tracer { + return SetupTracer("peerdb") +} diff --git a/flow/main.go b/flow/main.go index 3b1696161b..2b649bfb6d 100644 --- a/flow/main.go +++ b/flow/main.go @@ -14,6 +14,7 @@ import ( _ "go.uber.org/automaxprocs" "github.com/PeerDB-io/peer-flow/cmd" + "github.com/PeerDB-io/peer-flow/instrumentation" "github.com/PeerDB-io/peer-flow/logger" ) @@ -47,6 +48,14 @@ func main() { Usage: "Enable profiling for the application", Sources: cli.EnvVars("ENABLE_PROFILING"), } + + otelTracingFlag := &cli.BoolFlag{ + Name: "enable-otel-tracing", + Value: false, // Default is off + Usage: "Enable OpenTelemetry tracing for the application", + Sources: cli.EnvVars("ENABLE_OTEL_TRACING"), + } + otelMetricsFlag := &cli.BoolFlag{ Name: "enable-otel-metrics", Value: false, // Default is off @@ -82,6 +91,12 @@ func main() { Sources: cli.EnvVars("TEMPORAL_MAX_CONCURRENT_WORKFLOW_TASKS"), } + buildInstrumentationConfig := func(clicmd *cli.Command) instrumentation.Config { + return instrumentation.Config{ + EnableTracing: clicmd.Bool(otelTracingFlag.Name), + } + } + app := &cli.Command{ Name: "PeerDB Flows CLI", Commands: []*cli.Command{ @@ -89,6 +104,12 @@ func main() { Name: "worker", Action: func(ctx context.Context, clicmd *cli.Command) error { temporalHostPort := clicmd.String("temporal-host-port") + shutdownFunc, err := instrumentation.SetupInstrumentation(ctx, "flow-"+clicmd.Name, buildInstrumentationConfig(clicmd)) + if err != nil { + return err + } + // TODO make sure that this shutdown completes? + defer shutdownFunc(ctx) res, err := cmd.WorkerSetup(&cmd.WorkerSetupOptions{ TemporalHostPort: temporalHostPort, EnableProfiling: clicmd.Bool("enable-profiling"), @@ -122,6 +143,11 @@ func main() { Name: "snapshot-worker", Action: func(ctx context.Context, clicmd *cli.Command) error { temporalHostPort := clicmd.String("temporal-host-port") + shutdownFunc, err := instrumentation.SetupInstrumentation(ctx, "flow-"+clicmd.Name, buildInstrumentationConfig(clicmd)) + if err != nil { + return err + } + defer shutdownFunc(ctx) c, w, err := cmd.SnapshotWorkerMain(&cmd.SnapshotWorkerOptions{ TemporalHostPort: temporalHostPort, TemporalNamespace: clicmd.String("temporal-namespace"), @@ -161,7 +187,11 @@ func main() { }, Action: func(ctx context.Context, clicmd *cli.Command) error { temporalHostPort := clicmd.String("temporal-host-port") - + shutdownFunc, err := instrumentation.SetupInstrumentation(ctx, "flow-"+clicmd.Name, buildInstrumentationConfig(clicmd)) + if err != nil { + return err + } + defer shutdownFunc(ctx) return cmd.APIMain(ctx, &cmd.APIServerParams{ Port: uint16(clicmd.Uint("port")), TemporalHostPort: temporalHostPort, @@ -173,6 +203,9 @@ func main() { }, }, }, + Flags: []cli.Flag{ + otelTracingFlag, + }, } go func() { diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index 25834b6763..0cd1f20aa3 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -8,9 +8,8 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric" sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + "github.com/PeerDB-io/peer-flow/instrumentation/otel_common" "github.com/PeerDB-io/peer-flow/peerdbenv" ) @@ -21,19 +20,6 @@ type OtelManager struct { Int64GaugesCache map[string]*Int64SyncGauge } -// newOtelResource returns a resource describing this application. -func newOtelResource(otelServiceName string) (*resource.Resource, error) { - r, err := resource.Merge( - resource.Default(), - resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNameKey.String(otelServiceName), - ), - ) - - return r, err -} - func setupHttpOtelMetricsExporter() (sdkmetric.Exporter, error) { return otlpmetrichttp.New(context.Background()) } @@ -58,7 +44,7 @@ func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider, if err != nil { return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err) } - otelResource, err := newOtelResource(otelServiceName) + otelResource, err := otel_common.NewOtelResource(otelServiceName) if err != nil { return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) }