Skip to content

Commit

Permalink
feat(agent): Adding graphql introspection to agent
Browse files Browse the repository at this point in the history
  • Loading branch information
xoscar committed Aug 23, 2024
1 parent 9571433 commit 853aac2
Show file tree
Hide file tree
Showing 12 changed files with 1,499 additions and 683 deletions.
23 changes: 17 additions & 6 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ type Client struct {
logger *zap.Logger
tracer trace.Tracer

stopListener func(context.Context, *proto.StopRequest) error
triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
otlpConnectionTestListener func(context.Context, *proto.OTLPConnectionTestRequest) error
stopListener func(context.Context, *proto.StopRequest) error
triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
graphqlIntrospectionListener func(context.Context, *proto.GraphqlIntrospectRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
otlpConnectionTestListener func(context.Context, *proto.OTLPConnectionTestRequest) error
}

func (c *Client) Start(ctx context.Context) error {
Expand Down Expand Up @@ -107,6 +108,12 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

err = c.startGraphqlIntrospectionListener(ctx)
if err != nil {
c.logger.Error("Failed to start graphql introspection listener", zap.Error(err))
return err
}

err = c.startOTLPConnectionTestListener(ctx)
if err != nil {
c.logger.Error("Failed to start OTLP connection test listener", zap.Error(err))
Expand Down Expand Up @@ -158,6 +165,10 @@ func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingR
c.pollListener = listener
}

func (c *Client) OnGraphqlIntrospectionRequest(listener func(context.Context, *proto.GraphqlIntrospectRequest) error) {
c.graphqlIntrospectionListener = listener
}

func (c *Client) OnOTLPConnectionTest(listener func(context.Context, *proto.OTLPConnectionTestRequest) error) {
c.otlpConnectionTestListener = listener
}
Expand Down
67 changes: 51 additions & 16 deletions agent/client/mocks/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ import (

type GrpcServerMock struct {
proto.UnimplementedOrchestratorServer
port int
triggerChannel chan Message[*proto.TriggerRequest]
pollingChannel chan Message[*proto.PollingRequest]
otlpConnectionTestChannel chan Message[*proto.OTLPConnectionTestRequest]
terminationChannel chan Message[*proto.ShutdownRequest]
dataStoreTestChannel chan Message[*proto.DataStoreConnectionTestRequest]

lastTriggerResponse Message[*proto.TriggerResponse]
lastPollingResponse Message[*proto.PollingResponse]
lastOtlpConnectionResponse Message[*proto.OTLPConnectionTestResponse]
lastDataStoreConnectionResponse Message[*proto.DataStoreConnectionTestResponse]
port int
triggerChannel chan Message[*proto.TriggerRequest]
pollingChannel chan Message[*proto.PollingRequest]
otlpConnectionTestChannel chan Message[*proto.OTLPConnectionTestRequest]
terminationChannel chan Message[*proto.ShutdownRequest]
dataStoreTestChannel chan Message[*proto.DataStoreConnectionTestRequest]
graphqlIntrospectionChannel chan Message[*proto.GraphqlIntrospectRequest]

lastTriggerResponse Message[*proto.TriggerResponse]
lastPollingResponse Message[*proto.PollingResponse]
lastOtlpConnectionResponse Message[*proto.OTLPConnectionTestResponse]
lastDataStoreConnectionResponse Message[*proto.DataStoreConnectionTestResponse]
lastGraphqlIntrospectionResponse Message[*proto.GraphqlIntrospectResponse]

server *grpc.Server
}
Expand All @@ -37,11 +39,12 @@ type Message[T any] struct {

func NewGrpcServer() *GrpcServerMock {
server := &GrpcServerMock{
triggerChannel: make(chan Message[*proto.TriggerRequest]),
pollingChannel: make(chan Message[*proto.PollingRequest]),
terminationChannel: make(chan Message[*proto.ShutdownRequest]),
dataStoreTestChannel: make(chan Message[*proto.DataStoreConnectionTestRequest]),
otlpConnectionTestChannel: make(chan Message[*proto.OTLPConnectionTestRequest]),
triggerChannel: make(chan Message[*proto.TriggerRequest]),
pollingChannel: make(chan Message[*proto.PollingRequest]),
terminationChannel: make(chan Message[*proto.ShutdownRequest]),
dataStoreTestChannel: make(chan Message[*proto.DataStoreConnectionTestRequest]),
otlpConnectionTestChannel: make(chan Message[*proto.OTLPConnectionTestRequest]),
graphqlIntrospectionChannel: make(chan Message[*proto.GraphqlIntrospectRequest]),
}
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -159,6 +162,21 @@ func (s *GrpcServerMock) RegisterDataStoreConnectionTestAgent(id *proto.AgentIde
}
}

func (s *GrpcServerMock) RegisterGraphqlIntrospectListener(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterGraphqlIntrospectListenerServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
graphqlRequest := <-s.graphqlIntrospectionChannel

err := stream.Send(graphqlRequest.Data)
if err != nil {
log.Println("could not send polling request to agent: %w", err)
}
}
}

func (s *GrpcServerMock) RegisterOTLPConnectionTestListener(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterOTLPConnectionTestListenerServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
Expand Down Expand Up @@ -201,6 +219,15 @@ func (s *GrpcServerMock) SendPolledSpans(ctx context.Context, result *proto.Poll
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) SendGraphqlIntrospectResult(ctx context.Context, result *proto.GraphqlIntrospectResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastGraphqlIntrospectionResponse = Message[*proto.GraphqlIntrospectResponse]{Data: result, Context: ctx}
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) RegisterShutdownListener(_ *proto.AgentIdentification, stream proto.Orchestrator_RegisterShutdownListenerServer) error {
for {
shutdownRequest := <-s.terminationChannel
Expand All @@ -226,6 +253,10 @@ func (s *GrpcServerMock) SendDataStoreConnectionTestRequest(ctx context.Context,
s.dataStoreTestChannel <- Message[*proto.DataStoreConnectionTestRequest]{Context: ctx, Data: request}
}

func (s *GrpcServerMock) SendGraphqlIntrospectionRequest(ctx context.Context, request *proto.GraphqlIntrospectRequest) {
s.graphqlIntrospectionChannel <- Message[*proto.GraphqlIntrospectRequest]{Context: ctx, Data: request}
}

func (s *GrpcServerMock) SendOTLPConnectionTestRequest(ctx context.Context, request *proto.OTLPConnectionTestRequest) {
s.otlpConnectionTestChannel <- Message[*proto.OTLPConnectionTestRequest]{Context: ctx, Data: request}
}
Expand All @@ -246,6 +277,10 @@ func (s *GrpcServerMock) GetLastDataStoreConnectionResponse() Message[*proto.Dat
return s.lastDataStoreConnectionResponse
}

func (s *GrpcServerMock) GetLastGraphqlIntrospectionResponse() Message[*proto.GraphqlIntrospectResponse] {
return s.lastGraphqlIntrospectionResponse
}

func (s *GrpcServerMock) TerminateConnection(ctx context.Context, reason string) {
s.terminationChannel <- Message[*proto.ShutdownRequest]{
Context: ctx,
Expand Down
63 changes: 63 additions & 0 deletions agent/client/workflow_listen_for_graphql_introspection_requests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package client

import (
"context"
"fmt"
"log"
"time"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
"go.uber.org/zap"
)

func (c *Client) startGraphqlIntrospectionListener(ctx context.Context) error {
logger := c.logger.Named("graphqlIntrospectionListener")
logger.Debug("Starting")

client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterGraphqlIntrospectListener(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
logger.Error("could not open agent stream", zap.Error(err))
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
req := proto.GraphqlIntrospectRequest{}
err := stream.RecvMsg(&req)
if err != nil {
logger.Error("could not get message from graphql introspection stream", zap.Error(err))
}
if isEndOfFileError(err) || isCancelledError(err) {
logger.Debug("graphql introspection stream closed")
return
}

reconnected, err := c.handleDisconnectionError(err, &req)
if reconnected {
logger.Warn("reconnected to graphql introspection stream")
return
}

if err != nil {
logger.Error("could not get message from graphql introspection stream", zap.Error(err))
log.Println("could not get message from graphql introspection stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

// we want a new context per request, not to reuse the one from the stream
ctx := telemetry.InjectMetadataIntoContext(context.Background(), req.Metadata)
go func() {
err = c.graphqlIntrospectionListener(ctx, &req)
if err != nil {
logger.Error("could not handle graphql introspection request", zap.Error(err))
fmt.Println(err.Error())
}
}()
}
}()
return nil
}
23 changes: 23 additions & 0 deletions agent/client/workflow_send_graphql_introspection_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package client

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
)

func (c *Client) SendGraphqlIntrospectionResult(ctx context.Context, response *proto.GraphqlIntrospectResponse) error {
client := proto.NewOrchestratorClient(c.conn)

response.AgentIdentification = c.sessionConfig.AgentIdentification
response.Metadata = telemetry.ExtractMetadataFromContext(ctx)

_, err := client.SendGraphqlIntrospectResult(ctx, response)
if err != nil {
return fmt.Errorf("could not send graphql introspection result request: %w", err)
}

return nil
}
Loading

0 comments on commit 853aac2

Please sign in to comment.