diff --git a/common/persistence/persistence_interface.go b/common/persistence/persistence_interface.go index 57eae2725d8..5002126abbe 100644 --- a/common/persistence/persistence_interface.go +++ b/common/persistence/persistence_interface.go @@ -232,8 +232,8 @@ type ( // create the shard with the returned value. InternalGetOrCreateShardRequest struct { ShardID int32 - CreateShardInfo func() (rangeID int64, shardInfo *commonpb.DataBlob, err error) - LifecycleContext context.Context // cancelled when shard is unloaded + CreateShardInfo func() (rangeID int64, shardInfo *commonpb.DataBlob, err error) `json:"-"` + LifecycleContext context.Context // cancelled when shard is unloaded } // InternalGetOrCreateShardResponse is the response to GetShard diff --git a/common/persistence/telemetry/cluster_metadata_store_gen.go b/common/persistence/telemetry/cluster_metadata_store_gen.go index bd7a5ef160e..d55682778aa 100644 --- a/common/persistence/telemetry/cluster_metadata_store_gen.go +++ b/common/persistence/telemetry/cluster_metadata_store_gen.go @@ -32,15 +32,20 @@ package telemetry import ( "context" + "encoding/json" + "fmt" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" _sourcePersistence "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/telemetry" ) // telemetryClusterMetadataStore implements ClusterMetadataStore interface instrumented with OpenTelemetry. type telemetryClusterMetadataStore struct { _sourcePersistence.ClusterMetadataStore - tracer trace.Tracer + tracer trace.Tracer + debugMode bool } // newTelemetryClusterMetadataStore returns telemetryClusterMetadataStore. @@ -48,6 +53,7 @@ func newTelemetryClusterMetadataStore(base _sourcePersistence.ClusterMetadataSto return telemetryClusterMetadataStore{ ClusterMetadataStore: base, tracer: tracer, + debugMode: telemetry.DebugMode(), } } @@ -56,11 +62,25 @@ func (d telemetryClusterMetadataStore) DeleteClusterMetadata(ctx context.Context ctx, span := d.tracer.Start(ctx, "persistence.ClusterMetadataStore/DeleteClusterMetadata") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ClusterMetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteClusterMetadata")) + err = d.ClusterMetadataStore.DeleteClusterMetadata(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalDeleteClusterMetadataRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -69,11 +89,32 @@ func (d telemetryClusterMetadataStore) GetClusterMembers(ctx context.Context, re ctx, span := d.tracer.Start(ctx, "persistence.ClusterMetadataStore/GetClusterMembers") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ClusterMetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetClusterMembers")) + gp1, err = d.ClusterMetadataStore.GetClusterMembers(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetClusterMembersRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -82,11 +123,32 @@ func (d telemetryClusterMetadataStore) GetClusterMetadata(ctx context.Context, r ctx, span := d.tracer.Start(ctx, "persistence.ClusterMetadataStore/GetClusterMetadata") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ClusterMetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetClusterMetadata")) + ip1, err = d.ClusterMetadataStore.GetClusterMetadata(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalGetClusterMetadataRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -95,11 +157,32 @@ func (d telemetryClusterMetadataStore) ListClusterMetadata(ctx context.Context, ctx, span := d.tracer.Start(ctx, "persistence.ClusterMetadataStore/ListClusterMetadata") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ClusterMetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("ListClusterMetadata")) + ip1, err = d.ClusterMetadataStore.ListClusterMetadata(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalListClusterMetadataRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -108,11 +191,25 @@ func (d telemetryClusterMetadataStore) PruneClusterMembership(ctx context.Contex ctx, span := d.tracer.Start(ctx, "persistence.ClusterMetadataStore/PruneClusterMembership") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ClusterMetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("PruneClusterMembership")) + err = d.ClusterMetadataStore.PruneClusterMembership(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.PruneClusterMembershipRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -121,11 +218,32 @@ func (d telemetryClusterMetadataStore) SaveClusterMetadata(ctx context.Context, ctx, span := d.tracer.Start(ctx, "persistence.ClusterMetadataStore/SaveClusterMetadata") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ClusterMetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("SaveClusterMetadata")) + b1, err = d.ClusterMetadataStore.SaveClusterMetadata(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalSaveClusterMetadataRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -134,10 +252,24 @@ func (d telemetryClusterMetadataStore) UpsertClusterMembership(ctx context.Conte ctx, span := d.tracer.Start(ctx, "persistence.ClusterMetadataStore/UpsertClusterMembership") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ClusterMetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("UpsertClusterMembership")) + err = d.ClusterMetadataStore.UpsertClusterMembership(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.UpsertClusterMembershipRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } diff --git a/common/persistence/telemetry/execution_store_gen.go b/common/persistence/telemetry/execution_store_gen.go index 6b4ecd523a5..47b431ba92b 100644 --- a/common/persistence/telemetry/execution_store_gen.go +++ b/common/persistence/telemetry/execution_store_gen.go @@ -32,15 +32,20 @@ package telemetry import ( "context" + "encoding/json" + "fmt" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" _sourcePersistence "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/telemetry" ) // telemetryExecutionStore implements ExecutionStore interface instrumented with OpenTelemetry. type telemetryExecutionStore struct { _sourcePersistence.ExecutionStore - tracer trace.Tracer + tracer trace.Tracer + debugMode bool } // newTelemetryExecutionStore returns telemetryExecutionStore. @@ -48,6 +53,7 @@ func newTelemetryExecutionStore(base _sourcePersistence.ExecutionStore, tracer t return telemetryExecutionStore{ ExecutionStore: base, tracer: tracer, + debugMode: telemetry.DebugMode(), } } @@ -56,11 +62,25 @@ func (d telemetryExecutionStore) AddHistoryTasks(ctx context.Context, request *_ ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/AddHistoryTasks") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("AddHistoryTasks")) + err = d.ExecutionStore.AddHistoryTasks(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalAddHistoryTasksRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -69,11 +89,25 @@ func (d telemetryExecutionStore) AppendHistoryNodes(ctx context.Context, request ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/AppendHistoryNodes") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("AppendHistoryNodes")) + err = d.ExecutionStore.AppendHistoryNodes(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalAppendHistoryNodesRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -82,11 +116,25 @@ func (d telemetryExecutionStore) CompleteHistoryTask(ctx context.Context, reques ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/CompleteHistoryTask") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("CompleteHistoryTask")) + err = d.ExecutionStore.CompleteHistoryTask(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.CompleteHistoryTaskRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -95,11 +143,25 @@ func (d telemetryExecutionStore) ConflictResolveWorkflowExecution(ctx context.Co ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/ConflictResolveWorkflowExecution") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("ConflictResolveWorkflowExecution")) + err = d.ExecutionStore.ConflictResolveWorkflowExecution(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalConflictResolveWorkflowExecutionRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -108,11 +170,32 @@ func (d telemetryExecutionStore) CreateWorkflowExecution(ctx context.Context, re ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/CreateWorkflowExecution") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("CreateWorkflowExecution")) + ip1, err = d.ExecutionStore.CreateWorkflowExecution(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalCreateWorkflowExecutionRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -121,11 +204,25 @@ func (d telemetryExecutionStore) DeleteCurrentWorkflowExecution(ctx context.Cont ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/DeleteCurrentWorkflowExecution") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteCurrentWorkflowExecution")) + err = d.ExecutionStore.DeleteCurrentWorkflowExecution(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.DeleteCurrentWorkflowExecutionRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -134,11 +231,25 @@ func (d telemetryExecutionStore) DeleteHistoryBranch(ctx context.Context, reques ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/DeleteHistoryBranch") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteHistoryBranch")) + err = d.ExecutionStore.DeleteHistoryBranch(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalDeleteHistoryBranchRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -147,11 +258,25 @@ func (d telemetryExecutionStore) DeleteHistoryNodes(ctx context.Context, request ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/DeleteHistoryNodes") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteHistoryNodes")) + err = d.ExecutionStore.DeleteHistoryNodes(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalDeleteHistoryNodesRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -160,11 +285,25 @@ func (d telemetryExecutionStore) DeleteReplicationTaskFromDLQ(ctx context.Contex ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/DeleteReplicationTaskFromDLQ") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteReplicationTaskFromDLQ")) + err = d.ExecutionStore.DeleteReplicationTaskFromDLQ(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.DeleteReplicationTaskFromDLQRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -173,11 +312,25 @@ func (d telemetryExecutionStore) DeleteWorkflowExecution(ctx context.Context, re ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/DeleteWorkflowExecution") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteWorkflowExecution")) + err = d.ExecutionStore.DeleteWorkflowExecution(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.DeleteWorkflowExecutionRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -186,11 +339,25 @@ func (d telemetryExecutionStore) ForkHistoryBranch(ctx context.Context, request ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/ForkHistoryBranch") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("ForkHistoryBranch")) + err = d.ExecutionStore.ForkHistoryBranch(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalForkHistoryBranchRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -199,11 +366,32 @@ func (d telemetryExecutionStore) GetAllHistoryTreeBranches(ctx context.Context, ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/GetAllHistoryTreeBranches") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetAllHistoryTreeBranches")) + ip1, err = d.ExecutionStore.GetAllHistoryTreeBranches(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetAllHistoryTreeBranchesRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -212,11 +400,32 @@ func (d telemetryExecutionStore) GetCurrentExecution(ctx context.Context, reques ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/GetCurrentExecution") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetCurrentExecution")) + ip1, err = d.ExecutionStore.GetCurrentExecution(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetCurrentExecutionRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -225,11 +434,32 @@ func (d telemetryExecutionStore) GetHistoryTasks(ctx context.Context, request *_ ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/GetHistoryTasks") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetHistoryTasks")) + ip1, err = d.ExecutionStore.GetHistoryTasks(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetHistoryTasksRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -238,11 +468,32 @@ func (d telemetryExecutionStore) GetHistoryTreeContainingBranch(ctx context.Cont ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/GetHistoryTreeContainingBranch") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetHistoryTreeContainingBranch")) + ip1, err = d.ExecutionStore.GetHistoryTreeContainingBranch(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalGetHistoryTreeContainingBranchRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -251,11 +502,32 @@ func (d telemetryExecutionStore) GetReplicationTasksFromDLQ(ctx context.Context, ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/GetReplicationTasksFromDLQ") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetReplicationTasksFromDLQ")) + ip1, err = d.ExecutionStore.GetReplicationTasksFromDLQ(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetReplicationTasksFromDLQRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -264,11 +536,32 @@ func (d telemetryExecutionStore) GetWorkflowExecution(ctx context.Context, reque ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/GetWorkflowExecution") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetWorkflowExecution")) + ip1, err = d.ExecutionStore.GetWorkflowExecution(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetWorkflowExecutionRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -277,11 +570,32 @@ func (d telemetryExecutionStore) IsReplicationDLQEmpty(ctx context.Context, requ ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/IsReplicationDLQEmpty") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("IsReplicationDLQEmpty")) + b1, err = d.ExecutionStore.IsReplicationDLQEmpty(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetReplicationTasksFromDLQRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -290,11 +604,32 @@ func (d telemetryExecutionStore) ListConcreteExecutions(ctx context.Context, req ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/ListConcreteExecutions") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("ListConcreteExecutions")) + ip1, err = d.ExecutionStore.ListConcreteExecutions(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.ListConcreteExecutionsRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -303,11 +638,25 @@ func (d telemetryExecutionStore) PutReplicationTaskToDLQ(ctx context.Context, re ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/PutReplicationTaskToDLQ") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("PutReplicationTaskToDLQ")) + err = d.ExecutionStore.PutReplicationTaskToDLQ(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.PutReplicationTaskToDLQRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -316,11 +665,25 @@ func (d telemetryExecutionStore) RangeCompleteHistoryTasks(ctx context.Context, ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/RangeCompleteHistoryTasks") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("RangeCompleteHistoryTasks")) + err = d.ExecutionStore.RangeCompleteHistoryTasks(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.RangeCompleteHistoryTasksRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -329,11 +692,25 @@ func (d telemetryExecutionStore) RangeDeleteReplicationTaskFromDLQ(ctx context.C ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/RangeDeleteReplicationTaskFromDLQ") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("RangeDeleteReplicationTaskFromDLQ")) + err = d.ExecutionStore.RangeDeleteReplicationTaskFromDLQ(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.RangeDeleteReplicationTaskFromDLQRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -342,11 +719,32 @@ func (d telemetryExecutionStore) ReadHistoryBranch(ctx context.Context, request ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/ReadHistoryBranch") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("ReadHistoryBranch")) + ip1, err = d.ExecutionStore.ReadHistoryBranch(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalReadHistoryBranchRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -355,11 +753,25 @@ func (d telemetryExecutionStore) SetWorkflowExecution(ctx context.Context, reque ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/SetWorkflowExecution") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("SetWorkflowExecution")) + err = d.ExecutionStore.SetWorkflowExecution(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalSetWorkflowExecutionRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -368,10 +780,24 @@ func (d telemetryExecutionStore) UpdateWorkflowExecution(ctx context.Context, re ctx, span := d.tracer.Start(ctx, "persistence.ExecutionStore/UpdateWorkflowExecution") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ExecutionStore")) + span.SetAttributes(attribute.Key("persistence.method").String("UpdateWorkflowExecution")) + err = d.ExecutionStore.UpdateWorkflowExecution(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalUpdateWorkflowExecutionRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } diff --git a/common/persistence/telemetry/gowrap_template b/common/persistence/telemetry/gowrap_template index 1a73469295f..58c8892fbff 100644 --- a/common/persistence/telemetry/gowrap_template +++ b/common/persistence/telemetry/gowrap_template @@ -4,6 +4,7 @@ type {{$decorator}} struct { {{.Interface.Type}} tracer trace.Tracer + debugMode bool } // new{{upFirst $decorator}} returns {{$decorator}}. @@ -11,6 +12,7 @@ func new{{upFirst $decorator}} (base {{.Interface.Type}}, tracer trace.Tracer) { return {{$decorator}} { {{.Interface.Name}}: base, tracer: tracer, + debugMode: telemetry.DebugMode(), } } @@ -22,12 +24,37 @@ func new{{upFirst $decorator}} (base {{.Interface.Type}}, tracer trace.Tracer) { ctx, span := d.tracer.Start(ctx, "{{ printf "persistence.%s/%s" $.Interface.Name $method.Name }}") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("{{ $.Interface.Name }}")) + span.SetAttributes(attribute.Key("persistence.method").String("{{ $method.Name }}")) + {{$method.ResultsNames}} = d.{{$.Interface.Name}}.{{$method.Call}} {{- if $method.ReturnsError}} if err != nil { span.RecordError(err) } {{end}} + + if d.debugMode { + {{- if (gt (len $method.Params) 1) }} + {{ $request := (index $method.Params 1) }} + requestPayload, err := json.MarshalIndent({{ $request.Name }}, "", " ") + if err != nil { + fmt.Println("failed to serialize {{$request.Type}} for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + {{end}} + {{- if (gt (len $method.Results) 1) }} + {{ $result := (index $method.Results 1) }} + responsePayload, err := json.MarshalIndent({{ $result.Name }}, "", " ") + if err != nil { + fmt.Println("failed to serialize {{$result.Type}} for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + {{end}} + } + return } {{end}} diff --git a/common/persistence/telemetry/nexus_endpoint_store_gen.go b/common/persistence/telemetry/nexus_endpoint_store_gen.go index 7eccb91223c..2784c1c2955 100644 --- a/common/persistence/telemetry/nexus_endpoint_store_gen.go +++ b/common/persistence/telemetry/nexus_endpoint_store_gen.go @@ -32,15 +32,20 @@ package telemetry import ( "context" + "encoding/json" + "fmt" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" _sourcePersistence "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/telemetry" ) // telemetryNexusEndpointStore implements NexusEndpointStore interface instrumented with OpenTelemetry. type telemetryNexusEndpointStore struct { _sourcePersistence.NexusEndpointStore - tracer trace.Tracer + tracer trace.Tracer + debugMode bool } // newTelemetryNexusEndpointStore returns telemetryNexusEndpointStore. @@ -48,6 +53,7 @@ func newTelemetryNexusEndpointStore(base _sourcePersistence.NexusEndpointStore, return telemetryNexusEndpointStore{ NexusEndpointStore: base, tracer: tracer, + debugMode: telemetry.DebugMode(), } } @@ -56,11 +62,25 @@ func (d telemetryNexusEndpointStore) CreateOrUpdateNexusEndpoint(ctx context.Con ctx, span := d.tracer.Start(ctx, "persistence.NexusEndpointStore/CreateOrUpdateNexusEndpoint") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("NexusEndpointStore")) + span.SetAttributes(attribute.Key("persistence.method").String("CreateOrUpdateNexusEndpoint")) + err = d.NexusEndpointStore.CreateOrUpdateNexusEndpoint(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalCreateOrUpdateNexusEndpointRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -69,11 +89,25 @@ func (d telemetryNexusEndpointStore) DeleteNexusEndpoint(ctx context.Context, re ctx, span := d.tracer.Start(ctx, "persistence.NexusEndpointStore/DeleteNexusEndpoint") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("NexusEndpointStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteNexusEndpoint")) + err = d.NexusEndpointStore.DeleteNexusEndpoint(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.DeleteNexusEndpointRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -82,11 +116,32 @@ func (d telemetryNexusEndpointStore) GetNexusEndpoint(ctx context.Context, reque ctx, span := d.tracer.Start(ctx, "persistence.NexusEndpointStore/GetNexusEndpoint") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("NexusEndpointStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetNexusEndpoint")) + ip1, err = d.NexusEndpointStore.GetNexusEndpoint(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetNexusEndpointRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -95,10 +150,31 @@ func (d telemetryNexusEndpointStore) ListNexusEndpoints(ctx context.Context, req ctx, span := d.tracer.Start(ctx, "persistence.NexusEndpointStore/ListNexusEndpoints") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("NexusEndpointStore")) + span.SetAttributes(attribute.Key("persistence.method").String("ListNexusEndpoints")) + ip1, err = d.NexusEndpointStore.ListNexusEndpoints(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.ListNexusEndpointsRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } diff --git a/common/persistence/telemetry/queue_gen.go b/common/persistence/telemetry/queue_gen.go index 9931496f1fe..5b3ee66daa7 100644 --- a/common/persistence/telemetry/queue_gen.go +++ b/common/persistence/telemetry/queue_gen.go @@ -32,23 +32,29 @@ package telemetry import ( "context" + "encoding/json" + "fmt" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" commonpb "go.temporal.io/api/common/v1" _sourcePersistence "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/telemetry" ) // telemetryQueue implements Queue interface instrumented with OpenTelemetry. type telemetryQueue struct { _sourcePersistence.Queue - tracer trace.Tracer + tracer trace.Tracer + debugMode bool } // newTelemetryQueue returns telemetryQueue. func newTelemetryQueue(base _sourcePersistence.Queue, tracer trace.Tracer) telemetryQueue { return telemetryQueue{ - Queue: base, - tracer: tracer, + Queue: base, + tracer: tracer, + debugMode: telemetry.DebugMode(), } } @@ -57,11 +63,25 @@ func (d telemetryQueue) DeleteMessageFromDLQ(ctx context.Context, messageID int6 ctx, span := d.tracer.Start(ctx, "persistence.Queue/DeleteMessageFromDLQ") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteMessageFromDLQ")) + err = d.Queue.DeleteMessageFromDLQ(ctx, messageID) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(messageID, "", " ") + if err != nil { + fmt.Println("failed to serialize int64 for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -70,11 +90,25 @@ func (d telemetryQueue) DeleteMessagesBefore(ctx context.Context, messageID int6 ctx, span := d.tracer.Start(ctx, "persistence.Queue/DeleteMessagesBefore") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteMessagesBefore")) + err = d.Queue.DeleteMessagesBefore(ctx, messageID) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(messageID, "", " ") + if err != nil { + fmt.Println("failed to serialize int64 for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -83,11 +117,25 @@ func (d telemetryQueue) EnqueueMessage(ctx context.Context, blob *commonpb.DataB ctx, span := d.tracer.Start(ctx, "persistence.Queue/EnqueueMessage") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("EnqueueMessage")) + err = d.Queue.EnqueueMessage(ctx, blob) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(blob, "", " ") + if err != nil { + fmt.Println("failed to serialize *commonpb.DataBlob for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -96,11 +144,32 @@ func (d telemetryQueue) EnqueueMessageToDLQ(ctx context.Context, blob *commonpb. ctx, span := d.tracer.Start(ctx, "persistence.Queue/EnqueueMessageToDLQ") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("EnqueueMessageToDLQ")) + i1, err = d.Queue.EnqueueMessageToDLQ(ctx, blob) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(blob, "", " ") + if err != nil { + fmt.Println("failed to serialize *commonpb.DataBlob for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -109,11 +178,25 @@ func (d telemetryQueue) GetAckLevels(ctx context.Context) (ip1 *_sourcePersisten ctx, span := d.tracer.Start(ctx, "persistence.Queue/GetAckLevels") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("GetAckLevels")) + ip1, err = d.Queue.GetAckLevels(ctx) if err != nil { span.RecordError(err) } + if d.debugMode { + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -122,11 +205,25 @@ func (d telemetryQueue) GetDLQAckLevels(ctx context.Context) (ip1 *_sourcePersis ctx, span := d.tracer.Start(ctx, "persistence.Queue/GetDLQAckLevels") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("GetDLQAckLevels")) + ip1, err = d.Queue.GetDLQAckLevels(ctx) if err != nil { span.RecordError(err) } + if d.debugMode { + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -135,11 +232,25 @@ func (d telemetryQueue) Init(ctx context.Context, blob *commonpb.DataBlob) (err ctx, span := d.tracer.Start(ctx, "persistence.Queue/Init") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("Init")) + err = d.Queue.Init(ctx, blob) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(blob, "", " ") + if err != nil { + fmt.Println("failed to serialize *commonpb.DataBlob for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -148,11 +259,25 @@ func (d telemetryQueue) RangeDeleteMessagesFromDLQ(ctx context.Context, firstMes ctx, span := d.tracer.Start(ctx, "persistence.Queue/RangeDeleteMessagesFromDLQ") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("RangeDeleteMessagesFromDLQ")) + err = d.Queue.RangeDeleteMessagesFromDLQ(ctx, firstMessageID, lastMessageID) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(firstMessageID, "", " ") + if err != nil { + fmt.Println("failed to serialize int64 for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -161,11 +286,32 @@ func (d telemetryQueue) ReadMessages(ctx context.Context, lastMessageID int64, m ctx, span := d.tracer.Start(ctx, "persistence.Queue/ReadMessages") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("ReadMessages")) + qpa1, err = d.Queue.ReadMessages(ctx, lastMessageID, maxCount) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(lastMessageID, "", " ") + if err != nil { + fmt.Println("failed to serialize int64 for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -174,11 +320,32 @@ func (d telemetryQueue) ReadMessagesFromDLQ(ctx context.Context, firstMessageID ctx, span := d.tracer.Start(ctx, "persistence.Queue/ReadMessagesFromDLQ") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("ReadMessagesFromDLQ")) + qpa1, ba1, err = d.Queue.ReadMessagesFromDLQ(ctx, firstMessageID, lastMessageID, pageSize, pageToken) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(firstMessageID, "", " ") + if err != nil { + fmt.Println("failed to serialize int64 for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(ba1, "", " ") + if err != nil { + fmt.Println("failed to serialize []byte for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -187,11 +354,25 @@ func (d telemetryQueue) UpdateAckLevel(ctx context.Context, metadata *_sourcePer ctx, span := d.tracer.Start(ctx, "persistence.Queue/UpdateAckLevel") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("UpdateAckLevel")) + err = d.Queue.UpdateAckLevel(ctx, metadata) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(metadata, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalQueueMetadata for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -200,10 +381,24 @@ func (d telemetryQueue) UpdateDLQAckLevel(ctx context.Context, metadata *_source ctx, span := d.tracer.Start(ctx, "persistence.Queue/UpdateDLQAckLevel") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("Queue")) + span.SetAttributes(attribute.Key("persistence.method").String("UpdateDLQAckLevel")) + err = d.Queue.UpdateDLQAckLevel(ctx, metadata) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(metadata, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalQueueMetadata for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } diff --git a/common/persistence/telemetry/queue_v2_gen.go b/common/persistence/telemetry/queue_v2_gen.go index 5532e5fccfa..050821c6b58 100644 --- a/common/persistence/telemetry/queue_v2_gen.go +++ b/common/persistence/telemetry/queue_v2_gen.go @@ -32,22 +32,28 @@ package telemetry import ( "context" + "encoding/json" + "fmt" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" _sourcePersistence "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/telemetry" ) // telemetryQueueV2 implements QueueV2 interface instrumented with OpenTelemetry. type telemetryQueueV2 struct { _sourcePersistence.QueueV2 - tracer trace.Tracer + tracer trace.Tracer + debugMode bool } // newTelemetryQueueV2 returns telemetryQueueV2. func newTelemetryQueueV2(base _sourcePersistence.QueueV2, tracer trace.Tracer) telemetryQueueV2 { return telemetryQueueV2{ - QueueV2: base, - tracer: tracer, + QueueV2: base, + tracer: tracer, + debugMode: telemetry.DebugMode(), } } @@ -56,11 +62,32 @@ func (d telemetryQueueV2) CreateQueue(ctx context.Context, request *_sourcePersi ctx, span := d.tracer.Start(ctx, "persistence.QueueV2/CreateQueue") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("QueueV2")) + span.SetAttributes(attribute.Key("persistence.method").String("CreateQueue")) + ip1, err = d.QueueV2.CreateQueue(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalCreateQueueRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -69,11 +96,32 @@ func (d telemetryQueueV2) EnqueueMessage(ctx context.Context, request *_sourcePe ctx, span := d.tracer.Start(ctx, "persistence.QueueV2/EnqueueMessage") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("QueueV2")) + span.SetAttributes(attribute.Key("persistence.method").String("EnqueueMessage")) + ip1, err = d.QueueV2.EnqueueMessage(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalEnqueueMessageRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -82,11 +130,32 @@ func (d telemetryQueueV2) ListQueues(ctx context.Context, request *_sourcePersis ctx, span := d.tracer.Start(ctx, "persistence.QueueV2/ListQueues") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("QueueV2")) + span.SetAttributes(attribute.Key("persistence.method").String("ListQueues")) + ip1, err = d.QueueV2.ListQueues(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalListQueuesRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -95,11 +164,32 @@ func (d telemetryQueueV2) RangeDeleteMessages(ctx context.Context, request *_sou ctx, span := d.tracer.Start(ctx, "persistence.QueueV2/RangeDeleteMessages") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("QueueV2")) + span.SetAttributes(attribute.Key("persistence.method").String("RangeDeleteMessages")) + ip1, err = d.QueueV2.RangeDeleteMessages(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalRangeDeleteMessagesRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -108,10 +198,31 @@ func (d telemetryQueueV2) ReadMessages(ctx context.Context, request *_sourcePers ctx, span := d.tracer.Start(ctx, "persistence.QueueV2/ReadMessages") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("QueueV2")) + span.SetAttributes(attribute.Key("persistence.method").String("ReadMessages")) + ip1, err = d.QueueV2.ReadMessages(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalReadMessagesRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } diff --git a/common/persistence/telemetry/shard_store_gen.go b/common/persistence/telemetry/shard_store_gen.go index 6e9ce1f1a92..159f8635535 100644 --- a/common/persistence/telemetry/shard_store_gen.go +++ b/common/persistence/telemetry/shard_store_gen.go @@ -32,15 +32,20 @@ package telemetry import ( "context" + "encoding/json" + "fmt" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" _sourcePersistence "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/telemetry" ) // telemetryMetadataStore implements MetadataStore interface instrumented with OpenTelemetry. type telemetryMetadataStore struct { _sourcePersistence.MetadataStore - tracer trace.Tracer + tracer trace.Tracer + debugMode bool } // newTelemetryMetadataStore returns telemetryMetadataStore. @@ -48,6 +53,7 @@ func newTelemetryMetadataStore(base _sourcePersistence.MetadataStore, tracer tra return telemetryMetadataStore{ MetadataStore: base, tracer: tracer, + debugMode: telemetry.DebugMode(), } } @@ -56,11 +62,32 @@ func (d telemetryMetadataStore) CreateNamespace(ctx context.Context, request *_s ctx, span := d.tracer.Start(ctx, "persistence.MetadataStore/CreateNamespace") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("MetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("CreateNamespace")) + cp1, err = d.MetadataStore.CreateNamespace(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalCreateNamespaceRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -69,11 +96,25 @@ func (d telemetryMetadataStore) DeleteNamespace(ctx context.Context, request *_s ctx, span := d.tracer.Start(ctx, "persistence.MetadataStore/DeleteNamespace") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("MetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteNamespace")) + err = d.MetadataStore.DeleteNamespace(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.DeleteNamespaceRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -82,11 +123,25 @@ func (d telemetryMetadataStore) DeleteNamespaceByName(ctx context.Context, reque ctx, span := d.tracer.Start(ctx, "persistence.MetadataStore/DeleteNamespaceByName") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("MetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteNamespaceByName")) + err = d.MetadataStore.DeleteNamespaceByName(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.DeleteNamespaceByNameRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -95,11 +150,25 @@ func (d telemetryMetadataStore) GetMetadata(ctx context.Context) (gp1 *_sourcePe ctx, span := d.tracer.Start(ctx, "persistence.MetadataStore/GetMetadata") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("MetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetMetadata")) + gp1, err = d.MetadataStore.GetMetadata(ctx) if err != nil { span.RecordError(err) } + if d.debugMode { + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -108,11 +177,32 @@ func (d telemetryMetadataStore) GetNamespace(ctx context.Context, request *_sour ctx, span := d.tracer.Start(ctx, "persistence.MetadataStore/GetNamespace") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("MetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetNamespace")) + ip1, err = d.MetadataStore.GetNamespace(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetNamespaceRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -121,11 +211,32 @@ func (d telemetryMetadataStore) ListNamespaces(ctx context.Context, request *_so ctx, span := d.tracer.Start(ctx, "persistence.MetadataStore/ListNamespaces") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("MetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("ListNamespaces")) + ip1, err = d.MetadataStore.ListNamespaces(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalListNamespacesRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -134,11 +245,25 @@ func (d telemetryMetadataStore) RenameNamespace(ctx context.Context, request *_s ctx, span := d.tracer.Start(ctx, "persistence.MetadataStore/RenameNamespace") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("MetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("RenameNamespace")) + err = d.MetadataStore.RenameNamespace(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalRenameNamespaceRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -147,10 +272,24 @@ func (d telemetryMetadataStore) UpdateNamespace(ctx context.Context, request *_s ctx, span := d.tracer.Start(ctx, "persistence.MetadataStore/UpdateNamespace") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("MetadataStore")) + span.SetAttributes(attribute.Key("persistence.method").String("UpdateNamespace")) + err = d.MetadataStore.UpdateNamespace(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalUpdateNamespaceRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } diff --git a/common/persistence/telemetry/shared_store_gen.go b/common/persistence/telemetry/shared_store_gen.go index da39e052e6e..e262c43a3f1 100644 --- a/common/persistence/telemetry/shared_store_gen.go +++ b/common/persistence/telemetry/shared_store_gen.go @@ -32,15 +32,20 @@ package telemetry import ( "context" + "encoding/json" + "fmt" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" _sourcePersistence "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/telemetry" ) // telemetryShardStore implements ShardStore interface instrumented with OpenTelemetry. type telemetryShardStore struct { _sourcePersistence.ShardStore - tracer trace.Tracer + tracer trace.Tracer + debugMode bool } // newTelemetryShardStore returns telemetryShardStore. @@ -48,6 +53,7 @@ func newTelemetryShardStore(base _sourcePersistence.ShardStore, tracer trace.Tra return telemetryShardStore{ ShardStore: base, tracer: tracer, + debugMode: telemetry.DebugMode(), } } @@ -56,11 +62,25 @@ func (d telemetryShardStore) AssertShardOwnership(ctx context.Context, request * ctx, span := d.tracer.Start(ctx, "persistence.ShardStore/AssertShardOwnership") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ShardStore")) + span.SetAttributes(attribute.Key("persistence.method").String("AssertShardOwnership")) + err = d.ShardStore.AssertShardOwnership(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.AssertShardOwnershipRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -69,11 +89,32 @@ func (d telemetryShardStore) GetOrCreateShard(ctx context.Context, request *_sou ctx, span := d.tracer.Start(ctx, "persistence.ShardStore/GetOrCreateShard") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ShardStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetOrCreateShard")) + ip1, err = d.ShardStore.GetOrCreateShard(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalGetOrCreateShardRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -82,10 +123,24 @@ func (d telemetryShardStore) UpdateShard(ctx context.Context, request *_sourcePe ctx, span := d.tracer.Start(ctx, "persistence.ShardStore/UpdateShard") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("ShardStore")) + span.SetAttributes(attribute.Key("persistence.method").String("UpdateShard")) + err = d.ShardStore.UpdateShard(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalUpdateShardRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } diff --git a/common/persistence/telemetry/task_store_gen.go b/common/persistence/telemetry/task_store_gen.go index 5b527757bac..9b52eb81b9f 100644 --- a/common/persistence/telemetry/task_store_gen.go +++ b/common/persistence/telemetry/task_store_gen.go @@ -32,15 +32,20 @@ package telemetry import ( "context" + "encoding/json" + "fmt" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" _sourcePersistence "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/telemetry" ) // telemetryTaskStore implements TaskStore interface instrumented with OpenTelemetry. type telemetryTaskStore struct { _sourcePersistence.TaskStore - tracer trace.Tracer + tracer trace.Tracer + debugMode bool } // newTelemetryTaskStore returns telemetryTaskStore. @@ -48,6 +53,7 @@ func newTelemetryTaskStore(base _sourcePersistence.TaskStore, tracer trace.Trace return telemetryTaskStore{ TaskStore: base, tracer: tracer, + debugMode: telemetry.DebugMode(), } } @@ -56,11 +62,32 @@ func (d telemetryTaskStore) CompleteTasksLessThan(ctx context.Context, request * ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/CompleteTasksLessThan") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("CompleteTasksLessThan")) + i1, err = d.TaskStore.CompleteTasksLessThan(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.CompleteTasksLessThanRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -69,11 +96,32 @@ func (d telemetryTaskStore) CountTaskQueuesByBuildId(ctx context.Context, reques ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/CountTaskQueuesByBuildId") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("CountTaskQueuesByBuildId")) + i1, err = d.TaskStore.CountTaskQueuesByBuildId(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.CountTaskQueuesByBuildIdRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -82,11 +130,25 @@ func (d telemetryTaskStore) CreateTaskQueue(ctx context.Context, request *_sourc ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/CreateTaskQueue") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("CreateTaskQueue")) + err = d.TaskStore.CreateTaskQueue(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalCreateTaskQueueRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -95,11 +157,32 @@ func (d telemetryTaskStore) CreateTasks(ctx context.Context, request *_sourcePer ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/CreateTasks") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("CreateTasks")) + cp1, err = d.TaskStore.CreateTasks(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalCreateTasksRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -108,11 +191,25 @@ func (d telemetryTaskStore) DeleteTaskQueue(ctx context.Context, request *_sourc ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/DeleteTaskQueue") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("DeleteTaskQueue")) + err = d.TaskStore.DeleteTaskQueue(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.DeleteTaskQueueRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } @@ -121,11 +218,32 @@ func (d telemetryTaskStore) GetTaskQueue(ctx context.Context, request *_sourcePe ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/GetTaskQueue") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetTaskQueue")) + ip1, err = d.TaskStore.GetTaskQueue(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalGetTaskQueueRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -134,11 +252,32 @@ func (d telemetryTaskStore) GetTaskQueueUserData(ctx context.Context, request *_ ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/GetTaskQueueUserData") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetTaskQueueUserData")) + ip1, err = d.TaskStore.GetTaskQueueUserData(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetTaskQueueUserDataRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -147,11 +286,32 @@ func (d telemetryTaskStore) GetTaskQueuesByBuildId(ctx context.Context, request ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/GetTaskQueuesByBuildId") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetTaskQueuesByBuildId")) + sa1, err = d.TaskStore.GetTaskQueuesByBuildId(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetTaskQueuesByBuildIdRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -160,11 +320,32 @@ func (d telemetryTaskStore) GetTasks(ctx context.Context, request *_sourcePersis ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/GetTasks") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("GetTasks")) + ip1, err = d.TaskStore.GetTasks(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.GetTasksRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -173,11 +354,32 @@ func (d telemetryTaskStore) ListTaskQueue(ctx context.Context, request *_sourceP ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/ListTaskQueue") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("ListTaskQueue")) + ip1, err = d.TaskStore.ListTaskQueue(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.ListTaskQueueRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -186,11 +388,32 @@ func (d telemetryTaskStore) ListTaskQueueUserDataEntries(ctx context.Context, re ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/ListTaskQueueUserDataEntries") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("ListTaskQueueUserDataEntries")) + ip1, err = d.TaskStore.ListTaskQueueUserDataEntries(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.ListTaskQueueUserDataEntriesRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -199,11 +422,32 @@ func (d telemetryTaskStore) UpdateTaskQueue(ctx context.Context, request *_sourc ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/UpdateTaskQueue") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("UpdateTaskQueue")) + up1, err = d.TaskStore.UpdateTaskQueue(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalUpdateTaskQueueRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + responsePayload, err := json.MarshalIndent(err, "", " ") + if err != nil { + fmt.Println("failed to serialize error for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.response.payload").String(string(responsePayload))) + } + + } + return } @@ -212,10 +456,24 @@ func (d telemetryTaskStore) UpdateTaskQueueUserData(ctx context.Context, request ctx, span := d.tracer.Start(ctx, "persistence.TaskStore/UpdateTaskQueueUserData") defer span.End() + span.SetAttributes(attribute.Key("persistence.store").String("TaskStore")) + span.SetAttributes(attribute.Key("persistence.method").String("UpdateTaskQueueUserData")) + err = d.TaskStore.UpdateTaskQueueUserData(ctx, request) if err != nil { span.RecordError(err) } + if d.debugMode { + + requestPayload, err := json.MarshalIndent(request, "", " ") + if err != nil { + fmt.Println("failed to serialize *_sourcePersistence.InternalUpdateTaskQueueUserDataRequest for OTEL span: " + err.Error()) + } else { + span.SetAttributes(attribute.Key("persistence.request.payload").String(string(requestPayload))) + } + + } + return } diff --git a/service/history/tasks/category.go b/service/history/tasks/category.go index 12f6dccf1d2..2cb1bce5d9b 100644 --- a/service/history/tasks/category.go +++ b/service/history/tasks/category.go @@ -25,6 +25,7 @@ package tasks import ( + "fmt" "strconv" ) @@ -130,6 +131,10 @@ func (c Category) Type() CategoryType { return c.cType } +func (c Category) MarshalText() (text []byte, err error) { + return []byte(fmt.Sprintf("%d %s %s", c.id, c.cType, c.name)), nil +} + func (t CategoryType) String() string { switch t { case CategoryTypeImmediate: