Skip to content

Commit

Permalink
feat(tracemode): Tracemode v1
Browse files Browse the repository at this point in the history
xoscar committed Sep 26, 2024
1 parent e5545b6 commit f0794ea
Showing 36 changed files with 2,780 additions and 218 deletions.
12 changes: 12 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,8 @@ type Client struct {
graphqlIntrospectionListener func(context.Context, *proto.GraphqlIntrospectRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
traceModeListener func(context.Context, *proto.TraceModeRequest) error
getTraceListener func(context.Context, *proto.GetTraceRequest) error
otlpConnectionTestListener func(context.Context, *proto.OTLPConnectionTestRequest) error
}

@@ -126,6 +128,12 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

err = c.startTraceModeListener(ctx)
if err != nil {
c.logger.Error("Failed to start list traces listener", zap.Error(err))
return err
}

c.logger.Debug("ControlPlane client started")

return nil
@@ -161,6 +169,10 @@ func (c *Client) OnDataStoreTestConnectionRequest(listener func(context.Context,
c.dataStoreConnectionListener = listener
}

func (c *Client) OnTraceModeRequest(listener func(context.Context, *proto.TraceModeRequest) error) {
c.traceModeListener = listener
}

func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingRequest) error) {
c.pollListener = listener
}
35 changes: 35 additions & 0 deletions agent/client/mocks/grpc_server.go
Original file line number Diff line number Diff line change
@@ -22,12 +22,14 @@ type GrpcServerMock struct {
terminationChannel chan Message[*proto.ShutdownRequest]
dataStoreTestChannel chan Message[*proto.DataStoreConnectionTestRequest]
graphqlIntrospectionChannel chan Message[*proto.GraphqlIntrospectRequest]
traceModeChannel chan Message[*proto.TraceModeRequest]

lastTriggerResponse Message[*proto.TriggerResponse]
lastPollingResponse Message[*proto.PollingResponse]
lastOtlpConnectionResponse Message[*proto.OTLPConnectionTestResponse]
lastDataStoreConnectionResponse Message[*proto.DataStoreConnectionTestResponse]
lastGraphqlIntrospectionResponse Message[*proto.GraphqlIntrospectResponse]
lastTraceModeResponse Message[*proto.TraceModeResponse]

server *grpc.Server
}
@@ -45,6 +47,7 @@ func NewGrpcServer() *GrpcServerMock {
dataStoreTestChannel: make(chan Message[*proto.DataStoreConnectionTestRequest]),
otlpConnectionTestChannel: make(chan Message[*proto.OTLPConnectionTestRequest]),
graphqlIntrospectionChannel: make(chan Message[*proto.GraphqlIntrospectRequest]),
traceModeChannel: make(chan Message[*proto.TraceModeRequest]),
}
var wg sync.WaitGroup
wg.Add(1)
@@ -177,6 +180,21 @@ func (s *GrpcServerMock) RegisterGraphqlIntrospectListener(id *proto.AgentIdenti
}
}

func (s *GrpcServerMock) RegisterTraceModeAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterTraceModeAgentServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
traceModeRequest := <-s.traceModeChannel

err := stream.Send(traceModeRequest.Data)
if err != nil {
log.Println("could not trace mode request to agent: %w", err)
}
}
}

func (s *GrpcServerMock) RegisterOTLPConnectionTestListener(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterOTLPConnectionTestListenerServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
@@ -228,6 +246,15 @@ func (s *GrpcServerMock) SendGraphqlIntrospectResult(ctx context.Context, result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) SendTraceModeResponse(ctx context.Context, result *proto.TraceModeResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastTraceModeResponse = Message[*proto.TraceModeResponse]{Data: result, Context: ctx}
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) RegisterShutdownListener(_ *proto.AgentIdentification, stream proto.Orchestrator_RegisterShutdownListenerServer) error {
for {
shutdownRequest := <-s.terminationChannel
@@ -257,6 +284,10 @@ func (s *GrpcServerMock) SendGraphqlIntrospectionRequest(ctx context.Context, re
s.graphqlIntrospectionChannel <- Message[*proto.GraphqlIntrospectRequest]{Context: ctx, Data: request}
}

func (s *GrpcServerMock) SendTraceModeRequest(ctx context.Context, request *proto.TraceModeRequest) {
s.traceModeChannel <- Message[*proto.TraceModeRequest]{Context: ctx, Data: request}
}

func (s *GrpcServerMock) SendOTLPConnectionTestRequest(ctx context.Context, request *proto.OTLPConnectionTestRequest) {
s.otlpConnectionTestChannel <- Message[*proto.OTLPConnectionTestRequest]{Context: ctx, Data: request}
}
@@ -281,6 +312,10 @@ func (s *GrpcServerMock) GetLastGraphqlIntrospectionResponse() Message[*proto.Gr
return s.lastGraphqlIntrospectionResponse
}

func (s *GrpcServerMock) GetLastTraceModeResponse() Message[*proto.TraceModeResponse] {
return s.lastTraceModeResponse
}

func (s *GrpcServerMock) TerminateConnection(ctx context.Context, reason string) {
s.terminationChannel <- Message[*proto.ShutdownRequest]{
Context: ctx,
63 changes: 63 additions & 0 deletions agent/client/workflow_listen_for_trace_mode_requests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package client

import (
"context"
"fmt"
"log"
"time"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
"go.uber.org/zap"
)

func (c *Client) startTraceModeListener(ctx context.Context) error {
logger := c.logger.Named("trace_mode_listener")
logger.Debug("Starting")

client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterTraceModeAgent(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
logger.Error("could not open agent stream", zap.Error(err))
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
req := proto.TraceModeRequest{}
err := stream.RecvMsg(&req)
if err != nil {
logger.Error("could not get message from stop stream", zap.Error(err))
}
if isEndOfFileError(err) || isCancelledError(err) {
logger.Debug("stop stream closed")
return
}

reconnected, err := c.handleDisconnectionError(err, &req)
if reconnected {
logger.Warn("reconnected to stop stream")
return
}

if err != nil {
logger.Error("could not get message from stop stream", zap.Error(err))
log.Println("could not get message from trace mode stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

// we want a new context per request, not to reuse the one from the stream
ctx := telemetry.InjectMetadataIntoContext(context.Background(), req.Metadata)
go func() {
err = c.traceModeListener(ctx, &req)
if err != nil {
logger.Error("could not handle trace mode request", zap.Error(err))
fmt.Println(err.Error())
}
}()
}
}()
return nil
}
23 changes: 23 additions & 0 deletions agent/client/workflow_send_trace_mode_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package client

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
)

func (c *Client) SendTraceModeResponse(ctx context.Context, response *proto.TraceModeResponse) error {
client := proto.NewOrchestratorClient(c.conn)

response.AgentIdentification = c.sessionConfig.AgentIdentification
response.Metadata = telemetry.ExtractMetadataFromContext(ctx)

_, err := client.SendTraceModeResponse(ctx, response)
if err != nil {
return fmt.Errorf("could not send list traces result request: %w", err)
}

return nil
}
12 changes: 11 additions & 1 deletion agent/collector/cache.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package collector
import (
"slices"
"sync"
"time"

gocache "github.com/Code-Hex/go-generics-cache"
"go.opentelemetry.io/otel/trace"
@@ -14,6 +15,7 @@ type TraceCache interface {
Append(string, []*v1.Span)
RemoveSpans(string, []string)
Exists(string) bool
Keys() []string
}

type traceCache struct {
@@ -30,6 +32,14 @@ func (c *traceCache) Get(traceID string) ([]*v1.Span, bool) {
return c.internalCache.Get(traceID)
}

// List implements TraceCache.
func (c *traceCache) Keys() []string {
c.mutex.Lock()
defer c.mutex.Unlock()

return c.internalCache.Keys()
}

// Append implements TraceCache.
func (c *traceCache) Append(traceID string, spans []*v1.Span) {
c.mutex.Lock()
@@ -42,7 +52,7 @@ func (c *traceCache) Append(traceID string, spans []*v1.Span) {
spans = append(existingTraces, spans...)

c.internalCache.Set(traceID, spans)
c.receivedSpans.Set(traceID, currentNumberSpans)
c.receivedSpans.Set(traceID, currentNumberSpans, gocache.WithExpiration(time.Minute*10))
}

func (c *traceCache) RemoveSpans(traceID string, spanID []string) {
6 changes: 6 additions & 0 deletions agent/collector/collector.go
Original file line number Diff line number Diff line change
@@ -33,6 +33,12 @@ func WithTraceCache(traceCache TraceCache) CollectorOption {
}
}

func WithTraceMode(traceMode bool) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.traceMode = traceMode
}
}

func WithStartRemoteServer(startRemoteServer bool) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.startRemoteServer = startRemoteServer
20 changes: 19 additions & 1 deletion agent/collector/ingester.go
Original file line number Diff line number Diff line change
@@ -9,8 +9,10 @@ import (
"github.com/kubeshop/tracetest/agent/ui/dashboard/events"
"github.com/kubeshop/tracetest/agent/ui/dashboard/sensors"
"github.com/kubeshop/tracetest/server/otlp"
"github.com/kubeshop/tracetest/server/traces"
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v11 "go.opentelemetry.io/proto/otlp/common/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/zap"
)
@@ -73,6 +75,7 @@ type remoteIngesterConfig struct {
logger *zap.Logger
observer event.Observer
sensor sensors.Sensor
traceMode bool
}

func (i *forwardIngester) Statistics() Statistics {
@@ -136,6 +139,21 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
for _, resourceSpan := range resourceSpans {
for _, scopedSpan := range resourceSpan.ScopeSpans {
for _, span := range scopedSpan.Spans {
if scopedSpan.Scope != nil {
span.Attributes = append(span.Attributes, &v11.KeyValue{
Key: traces.MetadataServiceName,
Value: &v11.AnyValue{Value: &v11.AnyValue_StringValue{StringValue: scopedSpan.Scope.Name}},
})

// Add attributes from the resource
span.Attributes = append(span.Attributes, scopedSpan.Scope.Attributes...)
}

// Add attributes from the resource
if resourceSpan.Resource != nil {
span.Attributes = append(span.Attributes, resourceSpan.Resource.Attributes...)
}

traceID := trace.TraceID(span.TraceId).String()
spans[traceID] = append(spans[traceID], span)
}
@@ -148,7 +166,7 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
i.Lock()
i.traceIDs[traceID] = true
i.Unlock()
if _, ok := i.traceCache.Get(traceID); !ok {
if _, ok := i.traceCache.Get(traceID); !ok && !i.RemoteIngester.traceMode {
i.logger.Debug("traceID is not part of a test", zap.String("traceID", traceID))
// traceID is not part of a test
continue
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ type Config struct {
Mode string `mapstructure:"mode"`
Insecure bool `mapstructure:"insecure"`
SkipVerify bool `mapstructure:"skip_verify"`
TraceMode bool `mapstructure:"trace_mode"`

OTLPServer OtlpServer `mapstructure:"otlp_server"`
}
1 change: 1 addition & 0 deletions agent/config/flags.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ type Flags struct {
CollectorEndpoint string
Insecure bool
SkipVerify bool
TraceMode bool
}

func (f Flags) AutomatedEnvironmentCanBeInferred() bool {
Loading

0 comments on commit f0794ea

Please sign in to comment.