Skip to content

Commit

Permalink
Add tags for Flows along with routes to set and get (#2297)
Browse files Browse the repository at this point in the history
- add tags to telemetry to provide additional visibility
- later these can be added to otel metrics etc
  • Loading branch information
iskakaushik authored Nov 26, 2024
1 parent fb852aa commit c294c0b
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 196 deletions.
16 changes: 14 additions & 2 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
"github.com/PeerDB-io/peer-flow/tags"
)

// alerting service, no cool name :(
Expand Down Expand Up @@ -366,13 +367,24 @@ func (a *Alerter) sendTelemetryMessage(
flowName string,
more string,
level telemetry.Level,
tags ...string,
additionalTags ...string,
) {
allTags := []string{flowName, peerdbenv.PeerDBDeploymentUID()}
allTags = append(allTags, additionalTags...)

if flowTags, err := tags.GetTags(ctx, a.CatalogPool, flowName); err != nil {
logger.Warn("failed to get flow tags", slog.Any("error", err))
} else {
for key, value := range flowTags {
allTags = append(allTags, fmt.Sprintf("%s:%s", key, value))
}
}

details := fmt.Sprintf("[%s] %s", flowName, more)
attributes := telemetry.Attributes{
Level: level,
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
Tags: append([]string{flowName, peerdbenv.PeerDBDeploymentUID()}, tags...),
Tags: allTags,
Type: flowName,
}

Expand Down
84 changes: 84 additions & 0 deletions flow/cmd/tags_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cmd

import (
"context"
"fmt"
"log/slog"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/tags"
)

func (h *FlowRequestHandler) flowExists(ctx context.Context, flowName string) (bool, error) {
var exists bool
err := h.pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)", flowName).Scan(&exists)
if err != nil {
slog.Error("error checking if flow exists", slog.Any("error", err))
return false, err
}

slog.Info(fmt.Sprintf("flow %s exists: %t", flowName, exists))
return exists, nil
}

func (h *FlowRequestHandler) CreateOrReplaceFlowTags(
ctx context.Context,
in *protos.CreateOrReplaceFlowTagsRequest,
) (*protos.CreateOrReplaceFlowTagsResponse, error) {
flowName := in.FlowName

exists, err := h.flowExists(ctx, flowName)
if err != nil {
return nil, err
}

if !exists {
slog.Error("flow does not exist", slog.String("flow_name", flowName))
return nil, fmt.Errorf("flow %s does not exist", flowName)
}

tags := make(map[string]string, len(in.Tags))
for _, tag := range in.Tags {
tags[tag.Key] = tag.Value
}

_, err = h.pool.Exec(ctx, "UPDATE flows SET tags = $1 WHERE name = $2", tags, flowName)
if err != nil {
slog.Error("error updating flow tags", slog.Any("error", err))
return nil, err
}

return &protos.CreateOrReplaceFlowTagsResponse{
FlowName: flowName,
}, nil
}

func (h *FlowRequestHandler) GetFlowTags(ctx context.Context, in *protos.GetFlowTagsRequest) (*protos.GetFlowTagsResponse, error) {
flowName := in.FlowName

exists, err := h.flowExists(ctx, flowName)
if err != nil {
return nil, err
}

if !exists {
slog.Error("flow does not exist", slog.String("flow_name", flowName))
return nil, fmt.Errorf("flow %s does not exist", flowName)
}

tags, err := tags.GetTags(ctx, h.pool, flowName)
if err != nil {
slog.Error("error getting flow tags", slog.Any("error", err))
return nil, err
}

protosTags := make([]*protos.FlowTag, 0, len(tags))
for key, value := range tags {
protosTags = append(protosTags, &protos.FlowTag{Key: key, Value: value})
}

return &protos.GetFlowTagsResponse{
FlowName: flowName,
Tags: protosTags,
}, nil
}
24 changes: 24 additions & 0 deletions flow/tags/tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tags

import (
"context"
"log/slog"

"github.com/jackc/pgx/v5/pgxpool"
)

func GetTags(ctx context.Context, catalogPool *pgxpool.Pool, flowName string) (map[string]string, error) {
var tags map[string]string

err := catalogPool.QueryRow(ctx, "SELECT tags FROM flows WHERE name = $1", flowName).Scan(&tags)
if err != nil {
slog.Error("error getting flow tags", slog.Any("error", err))
return nil, err
}

if tags == nil {
tags = make(map[string]string)
}

return tags, nil
}
2 changes: 2 additions & 0 deletions nexus/catalog/migrations/V41__add_metadata_tags.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE flows
ADD COLUMN tags JSONB;
Loading

0 comments on commit c294c0b

Please sign in to comment.