Skip to content

Commit

Permalink
feat(cli): add open telemetry support for Tracetest agent (#3662)
Browse files Browse the repository at this point in the history
* wip

* wip

* chore: add support for otel collector on tracetest

* adding support for OTel tracer on workers

* Add fallback to ignore tracer if no collector endpoint is declared

* adding PR suggestions
  • Loading branch information
danielbdias authored Feb 20, 2024
1 parent 46e50da commit 471fc43
Show file tree
Hide file tree
Showing 14 changed files with 313 additions and 84 deletions.
5 changes: 4 additions & 1 deletion agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/kubeshop/tracetest/agent/proto"
"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/otel/trace"
)

const (
Expand Down Expand Up @@ -43,6 +45,7 @@ type Client struct {
done chan bool

logger *zap.Logger
tracer trace.Tracer

stopListener func(context.Context, *proto.StopRequest) error
triggerListener func(context.Context, *proto.TriggerRequest) error
Expand Down Expand Up @@ -98,7 +101,7 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

err = c.startHearthBeat(ctx)
err = c.startHeartBeat(ctx)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions agent/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"time"

"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -31,3 +32,9 @@ func WithLogger(logger *zap.Logger) Option {
c.logger = logger
}
}

func WithTracer(tracer trace.Tracer) Option {
return func(c *Client) {
c.tracer = tracer
}
}
2 changes: 1 addition & 1 deletion agent/client/workflow_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/kubeshop/tracetest/agent/proto"
)

func (c *Client) startHearthBeat(ctx context.Context) error {
func (c *Client) startHeartBeat(ctx context.Context) error {
client := proto.NewOrchestratorClient(c.conn)
ticker := time.NewTicker(c.config.PingPeriod)

Expand Down
10 changes: 7 additions & 3 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
)

type Config struct {
APIKey string `mapstructure:"api_key"`
Name string `mapstructure:"agent_name"`
ServerURL string `mapstructure:"server_url"`
APIKey string `mapstructure:"api_key"`
Name string `mapstructure:"agent_name"`
ServerURL string `mapstructure:"server_url"`
CollectorEndpoint string `mapstructure:"collector_endpoint"`
Mode string `mapstructure:"mode"`

OTLPServer OtlpServer `mapstructure:"otlp_server"`
}
Expand Down Expand Up @@ -45,6 +47,8 @@ func LoadConfig() (Config, error) {
vp.SetDefault("AGENT_NAME", getHostname())
vp.SetDefault("API_KEY", "")
vp.SetDefault("SERVER_URL", "https://app.tracetest.io")
vp.SetDefault("COLLECTOR_ENDPOINT", "")
vp.SetDefault("MODE", "")
vp.SetDefault("OTLP_SERVER.GRPC_PORT", 4317)
vp.SetDefault("OTLP_SERVER.HTTP_PORT", 4318)

Expand Down
17 changes: 9 additions & 8 deletions agent/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ const (
)

type Flags struct {
ServerURL string
OrganizationID string
EnvironmentID string
CI bool
AgentApiKey string
Token string
Mode Mode
LogLevel string
ServerURL string
OrganizationID string
EnvironmentID string
CI bool
AgentApiKey string
Token string
Mode Mode
LogLevel string
CollectorEndpoint string
}
39 changes: 26 additions & 13 deletions agent/runner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kubeshop/tracetest/agent/config"
"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/poller"

Expand All @@ -36,8 +37,14 @@ func (s *Session) WaitUntilDisconnected() {
func StartSession(ctx context.Context, cfg config.Config, observer event.Observer, logger *zap.Logger) (*Session, error) {
observer = event.WrapObserver(observer)

tracer, err := telemetry.GetTracer(ctx, cfg.CollectorEndpoint, cfg.Name)
if err != nil {
observer.Error(err)
return nil, err
}

traceCache := collector.NewTraceCache()
controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache, observer, logger)
controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache, observer, logger, tracer)
if err != nil {
return nil, err
}
Expand All @@ -47,7 +54,7 @@ func StartSession(ctx context.Context, cfg config.Config, observer event.Observe
return nil, err
}

agentCollector, err := StartCollector(ctx, cfg, traceCache, observer, logger)
agentCollector, err := StartCollector(ctx, cfg, traceCache, observer, logger, tracer)
if err != nil {
return nil, err
}
Expand All @@ -73,8 +80,8 @@ func StartSession(ctx context.Context, cfg config.Config, observer event.Observe
}, nil
}

func StartCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger) (collector.Collector, error) {
noopTracer := trace.NewNoopTracerProvider().Tracer("noop")
func StartCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger, tracer trace.Tracer) (collector.Collector, error) {

collectorConfig := collector.Config{
HTTPPort: config.OTLPServer.HTTPPort,
GRPCPort: config.OTLPServer.GRPCPort,
Expand All @@ -90,7 +97,7 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec
collector, err := collector.Start(
ctx,
collectorConfig,
noopTracer,
tracer,
opts...,
)
if err != nil {
Expand All @@ -100,7 +107,7 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec
return collector, nil
}

func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger) (*client.Client, error) {
func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger, tracer trace.Tracer) (*client.Client, error) {
controlPlaneClient, err := client.Connect(ctx, config.ServerURL,
client.WithAPIKey(config.APIKey),
client.WithAgentName(config.Name),
Expand All @@ -116,34 +123,40 @@ func newControlPlaneClient(ctx context.Context, config config.Config, traceCache
stopWorker := workers.NewStopperWorker(
workers.WithStopperObserver(observer),
workers.WithStopperCancelFuncList(processStopper.CancelMap()),
workers.WithStopperTracer(tracer),
)

triggerWorker := workers.NewTriggerWorker(
controlPlaneClient,
workers.WithTraceCache(traceCache),
workers.WithTriggerObserver(observer),
workers.WithTriggerStoppableProcessRunner(processStopper.RunStoppableProcess),
workers.WithTriggerLogger(logger),
workers.WithTriggerTracer(tracer),
)

pollingWorker := workers.NewPollerWorker(
controlPlaneClient,
workers.WithInMemoryDatastore(poller.NewInMemoryDatastore(traceCache)),
workers.WithObserver(observer),
workers.WithPollerObserver(observer),
workers.WithPollerStoppableProcessRunner(processStopper.RunStoppableProcess),
workers.WithPollerLogger(logger),
workers.WithPollerTracer(tracer),
)

dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(controlPlaneClient, observer)

triggerWorker.SetLogger(logger)
pollingWorker.SetLogger(logger)
dataStoreTestConnectionWorker.SetLogger(logger)
dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(
controlPlaneClient,
workers.WithTestConnectionLogger(logger),
workers.WithTestConnectionObserver(observer),
workers.WithTestConnectionTracer(tracer),
)

controlPlaneClient.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
controlPlaneClient.OnStopRequest(stopWorker.Stop)
controlPlaneClient.OnTriggerRequest(triggerWorker.Trigger)
controlPlaneClient.OnPollingRequest(pollingWorker.Poll)
controlPlaneClient.OnConnectionClosed(func(ctx context.Context, sr *proto.ShutdownRequest) error {
fmt.Printf("Server terminated the connection with the agent. Reason: %s\n", sr.Reason)
logger.Info(fmt.Sprintf("Server terminated the connection with the agent. Reason: %s\n", sr.Reason))
return controlPlaneClient.Close()
})

Expand Down
88 changes: 88 additions & 0 deletions agent/telemetry/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package telemetry

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const spanExporterTimeout = 1 * time.Minute

func GetTracer(ctx context.Context, otelExporterEndpoint, serviceName string) (trace.Tracer, error) {
if otelExporterEndpoint == "" {
// fallback, return noop
return trace.NewNoopTracerProvider().Tracer("noop"), nil
}

realServiceName := fmt.Sprintf("tracetestAgent_%s", serviceName)

spanExporter, err := newSpanExporter(ctx, otelExporterEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to setup span exporter: %w", err)
}

traceProvider, err := newTraceProvider(ctx, spanExporter, realServiceName)
if err != nil {
return nil, fmt.Errorf("failed to setup trace provider: %w", err)
}

return traceProvider.Tracer(realServiceName), nil
}

func newSpanExporter(ctx context.Context, otelExporterEndpoint string) (sdkTrace.SpanExporter, error) {
ctx, cancel := context.WithTimeout(ctx, spanExporterTimeout)
defer cancel()

conn, err := grpc.DialContext(ctx, otelExporterEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
}

traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}

return traceExporter, nil
}

func newTraceProvider(ctx context.Context, spanExporter sdkTrace.SpanExporter, serviceName string) (*sdkTrace.TracerProvider, error) {
defaultResource := resource.Default()

mergedResource, err := resource.Merge(
defaultResource,
resource.NewWithAttributes(
defaultResource.SchemaURL(),
semconv.ServiceNameKey.String(serviceName),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create otel resource: %w", err)
}

tp := sdkTrace.NewTracerProvider(
sdkTrace.WithBatcher(spanExporter),
sdkTrace.WithResource(mergedResource),
)

otel.SetTracerProvider(tp)

otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
),
)

return tp, nil
}
33 changes: 22 additions & 11 deletions agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (

type PollerWorker struct {
client *client.Client
tracer trace.Tracer
sentSpanIDs *gocache.Cache[string, bool]
inmemoryDatastore tracedb.TraceDB
logger *zap.Logger
observer event.Observer
stoppableProcessRunner StoppableProcessRunner
tracer trace.Tracer
}

type PollerOption func(*PollerWorker)
Expand All @@ -39,7 +39,7 @@ func WithInMemoryDatastore(datastore tracedb.TraceDB) PollerOption {
}
}

func WithObserver(observer event.Observer) PollerOption {
func WithPollerObserver(observer event.Observer) PollerOption {
return func(pw *PollerWorker) {
pw.observer = observer
}
Expand All @@ -51,16 +51,25 @@ func WithPollerStoppableProcessRunner(stoppableProcessRunner StoppableProcessRun
}
}

func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker {
// TODO: use a real tracer
tracer := trace.NewNoopTracerProvider().Tracer("noop")
func WithPollerLogger(logger *zap.Logger) PollerOption {
return func(pw *PollerWorker) {
pw.logger = logger
}
}

func WithPollerTracer(tracer trace.Tracer) PollerOption {
return func(pw *PollerWorker) {
pw.tracer = tracer
}
}

func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker {
pollerWorker := &PollerWorker{
client: client,
tracer: tracer,
sentSpanIDs: gocache.New[string, bool](),
logger: zap.NewNop(),
observer: event.NewNopObserver(),
tracer: trace.NewNoopTracerProvider().Tracer("noop"),
}

for _, opt := range opts {
Expand All @@ -70,11 +79,10 @@ func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker
return pollerWorker
}

func (w *PollerWorker) SetLogger(logger *zap.Logger) {
w.logger = logger
}

func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest) error {
ctx, span := w.tracer.Start(ctx, "PollingRequest Worker operation")
defer span.End()

w.logger.Debug("Received polling request", zap.Any("request", request))
w.observer.StartTracePoll(request)

Expand Down Expand Up @@ -110,7 +118,10 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest)
w.logger.Error("Error sending polling error", zap.Error(sendErr))
w.observer.Error(sendErr)

return fmt.Errorf("could not report polling error back to the server: %w. Original error: %s", sendErr, err.Error())
formattedErr := fmt.Errorf("could not report polling error back to the server: %w. Original error: %s", sendErr, err.Error())
span.RecordError(formattedErr)

return formattedErr
}
}

Expand Down
Loading

0 comments on commit 471fc43

Please sign in to comment.