Skip to content

Commit

Permalink
simplifying introspection
Browse files Browse the repository at this point in the history
  • Loading branch information
xoscar committed Aug 26, 2024
1 parent 50ce2c0 commit 8389adc
Show file tree
Hide file tree
Showing 6 changed files with 610 additions and 621 deletions.
1,029 changes: 506 additions & 523 deletions agent/proto/orchestrator.pb.go

Large diffs are not rendered by default.

10 changes: 4 additions & 6 deletions agent/proto/orchestrator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,15 @@ message OTLPConnectionTestResponse {

message GraphqlIntrospectRequest {
string requestID = 1;
string url = 2;
repeated HttpHeader headers = 3;
map <string, string> metadata = 4;
GraphqlRequest graphql = 2;
map <string, string> metadata = 3;
}

message GraphqlIntrospectResponse {
string requestID = 1;
bool successful = 2;
HttpResponse response = 2;
AgentIdentification agentIdentification = 3;
string schema = 4;
map <string, string> metadata = 5;
map <string, string> metadata = 4;
}

message DataStoreConnectionTestRequest {
Expand Down
140 changes: 67 additions & 73 deletions agent/workers/graphql_introspect.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
package workers

import (
"bytes"
"context"
"encoding/json"
"net/http"
"strings"

"github.com/Khan/genqlient/graphql"
"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
"github.com/wundergraph/graphql-go-tools/v2/pkg/astprinter"
"github.com/wundergraph/graphql-go-tools/v2/pkg/introspection"
"github.com/kubeshop/tracetest/agent/workers/trigger"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

type GraphqlIntrospectWorker struct {
client *client.Client
logger *zap.Logger
tracer trace.Tracer
meter metric.Meter
client *client.Client
logger *zap.Logger
tracer trace.Tracer
meter metric.Meter
graphqlTrigger trigger.Triggerer
}

type GraphqlIntrospectOption func(*GraphqlIntrospectWorker)
Expand All @@ -32,6 +30,12 @@ func WithGraphqlIntrospectLogger(logger *zap.Logger) GraphqlIntrospectOption {
}
}

func WithGraphqlIntrospectTrigger(trigger trigger.Triggerer) GraphqlIntrospectOption {
return func(w *GraphqlIntrospectWorker) {
w.graphqlTrigger = trigger
}
}

func WithGraphqlIntrospectTracer(tracer trace.Tracer) GraphqlIntrospectOption {
return func(w *GraphqlIntrospectWorker) {
w.tracer = tracer
Expand Down Expand Up @@ -59,93 +63,83 @@ func NewGraphqlIntrospectWorker(client *client.Client, opts ...GraphqlIntrospect
return worker
}

type transportHeaders struct {
headers http.Header
wrapped http.RoundTripper
}

func (t *transportHeaders) RoundTrip(req *http.Request) (*http.Response, error) {
for k, v := range t.headers {
req.Header[k] = v
}

return t.wrapped.RoundTrip(req)
}

func (w *GraphqlIntrospectWorker) Introspect(ctx context.Context, request *proto.GraphqlIntrospectRequest) error {
func (w *GraphqlIntrospectWorker) Introspect(ctx context.Context, r *proto.GraphqlIntrospectRequest) error {
ctx, span := w.tracer.Start(ctx, "TestConnectionRequest Worker operation")
defer span.End()

headers := make(http.Header)
for _, header := range request.Headers {
headers.Add(header.Key, header.Value)
}
request := mapProtoToGraphqlRequest(r)

httpClient := http.Client{
Transport: &transportHeaders{
headers: headers,
wrapped: http.DefaultTransport,
},
}

graphqlClient := graphql.NewClient(request.Url, &httpClient)

graphqlRequest := &graphql.Request{
Query: IntrospectionQuery,
}

var graphqlResponse graphql.Response
err := graphqlClient.MakeRequest(ctx, graphqlRequest, &graphqlResponse)
response, err := w.graphqlTrigger.Trigger(ctx, request, nil)
if err != nil {
w.logger.Error("Could not make graphql request", zap.Error(err))
w.logger.Error("Could not send graphql introspection request", zap.Error(err))
span.RecordError(err)
return err
}

json, err := json.Marshal(graphqlResponse.Data)
w.logger.Debug("Sending graphql introspection result", zap.Any("response", response))
err = w.client.SendGraphqlIntrospectionResult(ctx, mapGraphqlToProtoResponse(response.Result.Graphql))
if err != nil {
w.logger.Error("Could not marshal graphql response", zap.Error(err))
w.logger.Error("Could not send graphql introspection result", zap.Error(err))
span.RecordError(err)
return err
} else {
w.logger.Debug("Sent graphql introspection result")
}

converter := introspection.JsonConverter{}
buf := bytes.NewBuffer(json)
return nil
}

doc, err := converter.GraphQLDocument(buf)
if err != nil {
w.logger.Error("Could not convert graphql document", zap.Error(err))
span.RecordError(err)
return err
func mapProtoToGraphqlRequest(r *proto.GraphqlIntrospectRequest) trigger.Trigger {
headers := make([]trigger.HTTPHeader, 0)
for _, header := range r.Graphql.Headers {
headers = append(headers, trigger.HTTPHeader{
Key: header.Key,
Value: header.Value,
})
}

outWriter := &bytes.Buffer{}
err = astprinter.PrintIndent(doc, []byte(" "), outWriter)
if err != nil {
w.logger.Error("Could not print graphql document", zap.Error(err))
span.RecordError(err)
return err
request := &trigger.GraphqlRequest{
URL: r.Graphql.Url,
SSLVerification: r.Graphql.SSLVerification,
Headers: headers,
}

response := &proto.GraphqlIntrospectResponse{
RequestID: request.RequestID,
Successful: true,
Schema: outWriter.String(),
body := GraphQLBody{
Query: IntrospectionQuery,
}

w.logger.Debug("Sending datastore connection test result", zap.Any("response", response))
err = w.client.SendGraphqlIntrospectionResult(ctx, response)
if err != nil {
w.logger.Error("Could not send datastore connection test result", zap.Error(err))
span.RecordError(err)
} else {
w.logger.Debug("Sent datastore connection test result")
json, _ := json.Marshal(body)
request.Body = string(json)

return trigger.Trigger{
Type: trigger.TriggerTypeGraphql,
Graphql: request,
}
}

return nil
type GraphQLBody struct {
Query string `json:"query"`
}

func mapGraphqlToProtoResponse(r *trigger.GraphqlResponse) *proto.GraphqlIntrospectResponse {
headers := make([]*proto.HttpHeader, 0)
for _, header := range r.Headers {
headers = append(headers, &proto.HttpHeader{
Key: header.Key,
Value: header.Value,
})
}

return &proto.GraphqlIntrospectResponse{
Response: &proto.HttpResponse{
StatusCode: int32(r.StatusCode),
Status: r.Status,
Headers: headers,
Body: []byte(r.Body),
},
}
}

var IntrospectionQuery = `
var IntrospectionQuery = strings.ReplaceAll(`
query IntrospectionQuery {
__schema {
queryType { name }
Expand Down Expand Up @@ -237,4 +231,4 @@ var IntrospectionQuery = `
}
}
}
`
`, "\n", "")
31 changes: 29 additions & 2 deletions agent/workers/graphql_introspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,36 @@ import (
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/trigger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type mockGraphqlTrigger struct{}

func (m mockGraphqlTrigger) Trigger(ctx context.Context, triggerConfig trigger.Trigger, opts *trigger.Options) (trigger.Response, error) {
return trigger.Response{
Result: trigger.TriggerResult{
Type: trigger.TriggerTypeGraphql,
Graphql: &trigger.GraphqlResponse{
StatusCode: 200,
},
},
}, nil
}

func (m mockGraphqlTrigger) Type() trigger.TriggerType {
return trigger.TriggerTypeGraphql
}

func TestGraphqlIntrospectionWorker(t *testing.T) {
ctx := ContextWithTracingEnabled()
controlPlane := mocks.NewGrpcServer()

client, err := client.Connect(ctx, controlPlane.Addr(), client.WithInsecure())
require.NoError(t, err)

worker := workers.NewGraphqlIntrospectWorker(client)
worker := workers.NewGraphqlIntrospectWorker(client, workers.WithGraphqlIntrospectTrigger(mockGraphqlTrigger{}))

client.OnGraphqlIntrospectionRequest(func(ctx context.Context, pr *proto.GraphqlIntrospectRequest) error {
return worker.Introspect(ctx, pr)
Expand All @@ -30,12 +49,20 @@ func TestGraphqlIntrospectionWorker(t *testing.T) {

request := &proto.GraphqlIntrospectRequest{
RequestID: "test",
Url: "https://swapi-graphql.netlify.app/.netlify/functions/index",
Graphql: &proto.GraphqlRequest{
Url: "https://swapi-graphql.netlify.app/.netlify/functions/index",
Headers: []*proto.HttpHeader{
{Key: "Content-Type", Value: "application/json"},
},
},
}

controlPlane.SendGraphqlIntrospectionRequest(ctx, request)
time.Sleep(1 * time.Second)

resp := controlPlane.GetLastGraphqlIntrospectionResponse()
require.NotNil(t, resp, "agent did not send graphql introspection response back to server")

assert.Equal(t, resp.Data.Response.StatusCode, int32(200))

}
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/Code-Hex/go-generics-cache v1.3.1
github.com/IBM/sarama v1.40.1
github.com/Jeffail/gabs/v2 v2.7.0
github.com/Khan/genqlient v0.7.0
github.com/Masterminds/semver/v3 v3.2.1
github.com/agnivade/levenshtein v1.1.1
github.com/alecthomas/participle/v2 v2.0.0-alpha8
Expand Down Expand Up @@ -60,7 +59,6 @@ require (
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.9.0
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.75
gitlab.com/metakeule/fmtdate v1.2.2
go.opencensus.io v0.24.0
go.opentelemetry.io/collector/component v0.80.0
Expand Down Expand Up @@ -104,9 +102,7 @@ require (
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/console v1.0.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/distribution/reference v0.5.0 // indirect
Expand Down Expand Up @@ -174,16 +170,17 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/segmentio/backo-go v1.0.0 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/streadway/amqp v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/vektah/gqlparser/v2 v2.5.11 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
Expand All @@ -200,6 +197,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
Expand Down
Loading

0 comments on commit 8389adc

Please sign in to comment.