Skip to content

Commit

Permalink
OTEL for persistence (#7050)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->

Adds (basic) OTEL tracing for all persistence requests:

<img width="1611" alt="Screenshot 2025-01-06 at 5 32 26 PM"
src="https://github.com/user-attachments/assets/60badc87-c62d-4af7-8461-f3794afbe3fc"
/>

## Why?
<!-- Tell your future self why have you made these changes -->

For improved observability during development.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

By running `make OTEL=true start` and seeing the results in Grafana.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

OTEL is off by default, no behavior change is expected.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
stephanos authored Jan 9, 2025
1 parent 8ee9943 commit 1b4c7f4
Show file tree
Hide file tree
Showing 17 changed files with 1,638 additions and 11 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ $(STAMPDIR)/goimports-$(GOIMPORTS_VER): | $(STAMPDIR) $(LOCALBIN)
@touch $@
$(GOIMPORTS): $(STAMPDIR)/goimports-$(GOIMPORTS_VER)

GOWRAP_VER := v1.4.1
GOWRAP := $(LOCALBIN)/gowrap
$(STAMPDIR)/gowrap-$(GOWRAP_VER): | $(STAMPDIR) $(LOCALBIN)
$(call go-install-tool,$(GOWRAP),github.com/hexdigest/gowrap/cmd/gowrap,$(GOWRAP_VER))
@touch $@
$(GOWRAP): $(STAMPDIR)/gowrap-$(GOWRAP_VER)

# Mockgen is called by name throughout the codebase, so we need to keep the binary name consistent
MOCKGEN_VER := v0.4.0
MOCKGEN := $(LOCALBIN)/mockgen
Expand Down Expand Up @@ -601,9 +608,10 @@ update-dependencies:
@go get -u -t $(PINNED_DEPENDENCIES) ./...
@go mod tidy

go-generate: $(MOCKGEN) $(GOIMPORTS) $(STRINGER)
go-generate: $(MOCKGEN) $(GOIMPORTS) $(STRINGER) $(GOWRAP)
@printf $(COLOR) "Process go:generate directives..."
@go generate ./...
$(MAKE) copyright

ensure-no-changes:
@printf $(COLOR) "Check for local changes..."
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/client/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package client
import (
"time"

"go.opentelemetry.io/otel/trace"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
Expand All @@ -37,9 +38,11 @@ import (
"go.temporal.io/server/common/persistence/faultinjection"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/persistence/telemetry"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/resolver"
otel "go.temporal.io/server/common/telemetry"
"go.uber.org/fx"
)

Expand Down Expand Up @@ -182,6 +185,7 @@ func DataStoreFactoryProvider(
abstractDataStoreFactory AbstractDataStoreFactory,
logger log.Logger,
metricsHandler metrics.Handler,
tracerProvider trace.TracerProvider,
) persistence.DataStoreFactory {

var dataStoreFactory persistence.DataStoreFactory
Expand All @@ -201,6 +205,10 @@ func DataStoreFactoryProvider(
dataStoreFactory = faultinjection.NewFaultInjectionDatastoreFactory(defaultStoreCfg.FaultInjection, dataStoreFactory)
}

if otel.IsEnabled(tracerProvider) {
dataStoreFactory = telemetry.NewTelemetryDataStoreFactory(dataStoreFactory, tracerProvider.Tracer("persistence"))
}

return dataStoreFactory
}

Expand Down
5 changes: 5 additions & 0 deletions common/persistence/persistence-tests/persistence_test_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"time"

"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -107,6 +109,7 @@ type (
ReplicationReadLevel int64
DefaultTestCluster PersistenceTestCluster
Logger log.Logger
TracerProvider trace.TracerProvider
}

// PersistenceTestCluster exposes management operations on a database
Expand Down Expand Up @@ -191,6 +194,7 @@ func NewTestBaseForCluster(testCluster PersistenceTestCluster, logger log.Logger
return &TestBase{
DefaultTestCluster: testCluster,
Logger: logger,
TracerProvider: noop.NewTracerProvider(),
}
}

Expand All @@ -217,6 +221,7 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) {
s.AbstractDataStoreFactory,
s.Logger,
metrics.NoopMetricsHandler,
s.TracerProvider,
)
factory := client.NewFactory(
dataStoreFactory,
Expand Down
143 changes: 143 additions & 0 deletions common/persistence/telemetry/cluster_metadata_store_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

148 changes: 148 additions & 0 deletions common/persistence/telemetry/data_store_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package telemetry

import (
"go.opentelemetry.io/otel/trace"
"go.temporal.io/server/common/persistence"
)

type (
TelemetryDataStoreFactory struct {
baseFactory persistence.DataStoreFactory
tracer trace.Tracer

taskStore persistence.TaskStore
shardStore persistence.ShardStore
metadataStore persistence.MetadataStore
executionStore persistence.ExecutionStore
queue persistence.Queue
queueV2 persistence.QueueV2
clusterMDStore persistence.ClusterMetadataStore
nexusEndpointStore persistence.NexusEndpointStore
}
)

func NewTelemetryDataStoreFactory(
baseFactory persistence.DataStoreFactory,
tracer trace.Tracer,
) *TelemetryDataStoreFactory {
return &TelemetryDataStoreFactory{
baseFactory: baseFactory,
tracer: tracer,
}
}

func (d *TelemetryDataStoreFactory) Close() {
d.baseFactory.Close()
}

func (d *TelemetryDataStoreFactory) NewTaskStore() (persistence.TaskStore, error) {
if d.taskStore == nil {
baseStore, err := d.baseFactory.NewTaskStore()
if err != nil {
return nil, err
}
d.taskStore = newTelemetryTaskStore(baseStore, d.tracer)
}
return d.taskStore, nil
}

func (d *TelemetryDataStoreFactory) NewShardStore() (persistence.ShardStore, error) {
if d.shardStore == nil {
baseStore, err := d.baseFactory.NewShardStore()
if err != nil {
return nil, err
}
d.shardStore = newTelemetryShardStore(baseStore, d.tracer)
}
return d.shardStore, nil
}

func (d *TelemetryDataStoreFactory) NewMetadataStore() (persistence.MetadataStore, error) {
if d.metadataStore == nil {
baseStore, err := d.baseFactory.NewMetadataStore()
if err != nil {
return nil, err
}
d.metadataStore = newTelemetryMetadataStore(baseStore, d.tracer)
}
return d.metadataStore, nil
}

func (d *TelemetryDataStoreFactory) NewExecutionStore() (persistence.ExecutionStore, error) {
if d.executionStore == nil {
baseStore, err := d.baseFactory.NewExecutionStore()
if err != nil {
return nil, err
}
d.executionStore = newTelemetryExecutionStore(baseStore, d.tracer)
}
return d.executionStore, nil
}

func (d *TelemetryDataStoreFactory) NewQueue(queueType persistence.QueueType) (persistence.Queue, error) {
if d.queue == nil {
baseQueue, err := d.baseFactory.NewQueue(queueType)
if err != nil {
return baseQueue, err
}
d.queue = newTelemetryQueue(baseQueue, d.tracer)
}
return d.queue, nil
}

func (d *TelemetryDataStoreFactory) NewQueueV2() (persistence.QueueV2, error) {
if d.queueV2 == nil {
baseQueue, err := d.baseFactory.NewQueueV2()
if err != nil {
return baseQueue, err
}
d.queueV2 = newTelemetryQueueV2(baseQueue, d.tracer)
}
return d.queueV2, nil
}

func (d *TelemetryDataStoreFactory) NewClusterMetadataStore() (persistence.ClusterMetadataStore, error) {
if d.clusterMDStore == nil {
baseStore, err := d.baseFactory.NewClusterMetadataStore()
if err != nil {
return nil, err
}
d.clusterMDStore = newTelemetryClusterMetadataStore(baseStore, d.tracer)
}
return d.clusterMDStore, nil
}

func (d *TelemetryDataStoreFactory) NewNexusEndpointStore() (persistence.NexusEndpointStore, error) {
if d.nexusEndpointStore == nil {
baseStore, err := d.baseFactory.NewNexusEndpointStore()
if err != nil {
return nil, err
}
d.nexusEndpointStore = newTelemetryNexusEndpointStore(baseStore, d.tracer)
}
return d.nexusEndpointStore, nil
}
Loading

0 comments on commit 1b4c7f4

Please sign in to comment.