From 1611db71a33d30d482e84514dec5b60a9d5d91e5 Mon Sep 17 00:00:00 2001 From: Matheus Nogueira Date: Wed, 17 Jan 2024 11:36:55 -0300 Subject: [PATCH] feat: otlp connection span count/reset endpoints (#3533) * feat: add new endpoint for requesting the OTLP connection test * wip * make ingester count spans all the time * enable otlp ingester if connection is being tested * implement both otlp get count spans and reset count endpoints * revert changes used to debug feature * revert change * add spacing * fix test * extract default timeout into a const --- api/config.yaml | 7 + api/openapi.yaml | 26 +++ cli/openapi/api_api.go | 188 ++++++++++++++++++ .../model_otlp_test_connection_response.go | 160 +++++++++++++++ server/app/app.go | 21 +- server/http/controller.go | 32 ++- server/http/controller_test.go | 1 + server/openapi/api.go | 4 + server/openapi/api_api.go | 38 ++++ .../model_otlp_test_connection_response.go | 33 +++ server/otlp/ingester.go | 80 +++++++- server/subscription/message.go | 1 - server/testconnection/otlp.go | 124 ++++++++++++ server/testconnection/otlp_test.go | 68 +++++++ web/src/types/Generated.types.ts | 32 +++ 15 files changed, 801 insertions(+), 14 deletions(-) create mode 100644 cli/openapi/model_otlp_test_connection_response.go create mode 100644 server/openapi/model_otlp_test_connection_response.go create mode 100644 server/testconnection/otlp.go create mode 100644 server/testconnection/otlp_test.go diff --git a/api/config.yaml b/api/config.yaml index 7061effe1a..745f392b5c 100644 --- a/api/config.yaml +++ b/api/config.yaml @@ -194,3 +194,10 @@ components: type: array items: $ref: "./config.yaml#/components/schemas/Demo" + OTLPTestConnectionResponse: + type: object + properties: + spanCount: + type: integer + lastSpanTimestamp: + type: date diff --git a/api/openapi.yaml b/api/openapi.yaml index 4b6fa11a57..34b5d1a350 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -720,6 +720,32 @@ paths: application/json: schema: $ref: "./config.yaml#/components/schemas/TestConnectionResponse" + /config/connection/otlp: + get: + tags: + - api + summary: get information about the OTLP connection + description: get information about the OTLP connection + operationId: getOTLPConnectionInformation + responses: + 200: + description: The connection information was retrieved successfully + content: + application/json: + schema: + $ref: "./config.yaml#/components/schemas/OTLPTestConnectionResponse" + 408: + description: The connection information was not available and the connection timed out + /config/connection/otlp/reset: + post: + tags: + - api + summary: reset the OTLP connection span count + description: reset the OTLP connection span count + operationId: resetOTLPConnectionInformation + responses: + 200: + description: Ok /configs: get: tags: diff --git a/cli/openapi/api_api.go b/cli/openapi/api_api.go index 47c1c388bf..718d047f64 100644 --- a/cli/openapi/api_api.go +++ b/cli/openapi/api_api.go @@ -546,6 +546,106 @@ func (a *ApiApiService) ExpressionResolveExecute(r ApiExpressionResolveRequest) return localVarReturnValue, localVarHTTPResponse, nil } +type ApiGetOTLPConnectionInformationRequest struct { + ctx context.Context + ApiService *ApiApiService +} + +func (r ApiGetOTLPConnectionInformationRequest) Execute() (*OTLPTestConnectionResponse, *http.Response, error) { + return r.ApiService.GetOTLPConnectionInformationExecute(r) +} + +/* +GetOTLPConnectionInformation get information about the OTLP connection + +get information about the OTLP connection + + @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). + @return ApiGetOTLPConnectionInformationRequest +*/ +func (a *ApiApiService) GetOTLPConnectionInformation(ctx context.Context) ApiGetOTLPConnectionInformationRequest { + return ApiGetOTLPConnectionInformationRequest{ + ApiService: a, + ctx: ctx, + } +} + +// Execute executes the request +// +// @return OTLPTestConnectionResponse +func (a *ApiApiService) GetOTLPConnectionInformationExecute(r ApiGetOTLPConnectionInformationRequest) (*OTLPTestConnectionResponse, *http.Response, error) { + var ( + localVarHTTPMethod = http.MethodGet + localVarPostBody interface{} + formFiles []formFile + localVarReturnValue *OTLPTestConnectionResponse + ) + + localBasePath, err := a.client.cfg.ServerURLWithContext(r.ctx, "ApiApiService.GetOTLPConnectionInformation") + if err != nil { + return localVarReturnValue, nil, &GenericOpenAPIError{error: err.Error()} + } + + localVarPath := localBasePath + "/config/connection/otlp" + + localVarHeaderParams := make(map[string]string) + localVarQueryParams := url.Values{} + localVarFormParams := url.Values{} + + // to determine the Content-Type header + localVarHTTPContentTypes := []string{} + + // set Content-Type header + localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes) + if localVarHTTPContentType != "" { + localVarHeaderParams["Content-Type"] = localVarHTTPContentType + } + + // to determine the Accept header + localVarHTTPHeaderAccepts := []string{"application/json"} + + // set Accept header + localVarHTTPHeaderAccept := selectHeaderAccept(localVarHTTPHeaderAccepts) + if localVarHTTPHeaderAccept != "" { + localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept + } + req, err := a.client.prepareRequest(r.ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, formFiles) + if err != nil { + return localVarReturnValue, nil, err + } + + localVarHTTPResponse, err := a.client.callAPI(req) + if err != nil || localVarHTTPResponse == nil { + return localVarReturnValue, localVarHTTPResponse, err + } + + localVarBody, err := ioutil.ReadAll(localVarHTTPResponse.Body) + localVarHTTPResponse.Body.Close() + localVarHTTPResponse.Body = ioutil.NopCloser(bytes.NewBuffer(localVarBody)) + if err != nil { + return localVarReturnValue, localVarHTTPResponse, err + } + + if localVarHTTPResponse.StatusCode >= 300 { + newErr := &GenericOpenAPIError{ + body: localVarBody, + error: localVarHTTPResponse.Status, + } + return localVarReturnValue, localVarHTTPResponse, newErr + } + + err = a.client.decode(&localVarReturnValue, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) + if err != nil { + newErr := &GenericOpenAPIError{ + body: localVarBody, + error: err.Error(), + } + return localVarReturnValue, localVarHTTPResponse, newErr + } + + return localVarReturnValue, localVarHTTPResponse, nil +} + type ApiGetResourcesRequest struct { ctx context.Context ApiService *ApiApiService @@ -2134,6 +2234,94 @@ func (a *ApiApiService) RerunTestRunExecute(r ApiRerunTestRunRequest) (*TestRun, return localVarReturnValue, localVarHTTPResponse, nil } +type ApiResetOTLPConnectionInformationRequest struct { + ctx context.Context + ApiService *ApiApiService +} + +func (r ApiResetOTLPConnectionInformationRequest) Execute() (*http.Response, error) { + return r.ApiService.ResetOTLPConnectionInformationExecute(r) +} + +/* +ResetOTLPConnectionInformation reset the OTLP connection span count + +reset the OTLP connection span count + + @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). + @return ApiResetOTLPConnectionInformationRequest +*/ +func (a *ApiApiService) ResetOTLPConnectionInformation(ctx context.Context) ApiResetOTLPConnectionInformationRequest { + return ApiResetOTLPConnectionInformationRequest{ + ApiService: a, + ctx: ctx, + } +} + +// Execute executes the request +func (a *ApiApiService) ResetOTLPConnectionInformationExecute(r ApiResetOTLPConnectionInformationRequest) (*http.Response, error) { + var ( + localVarHTTPMethod = http.MethodPost + localVarPostBody interface{} + formFiles []formFile + ) + + localBasePath, err := a.client.cfg.ServerURLWithContext(r.ctx, "ApiApiService.ResetOTLPConnectionInformation") + if err != nil { + return nil, &GenericOpenAPIError{error: err.Error()} + } + + localVarPath := localBasePath + "/config/connection/otlp/reset" + + localVarHeaderParams := make(map[string]string) + localVarQueryParams := url.Values{} + localVarFormParams := url.Values{} + + // to determine the Content-Type header + localVarHTTPContentTypes := []string{} + + // set Content-Type header + localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes) + if localVarHTTPContentType != "" { + localVarHeaderParams["Content-Type"] = localVarHTTPContentType + } + + // to determine the Accept header + localVarHTTPHeaderAccepts := []string{} + + // set Accept header + localVarHTTPHeaderAccept := selectHeaderAccept(localVarHTTPHeaderAccepts) + if localVarHTTPHeaderAccept != "" { + localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept + } + req, err := a.client.prepareRequest(r.ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, formFiles) + if err != nil { + return nil, err + } + + localVarHTTPResponse, err := a.client.callAPI(req) + if err != nil || localVarHTTPResponse == nil { + return localVarHTTPResponse, err + } + + localVarBody, err := ioutil.ReadAll(localVarHTTPResponse.Body) + localVarHTTPResponse.Body.Close() + localVarHTTPResponse.Body = ioutil.NopCloser(bytes.NewBuffer(localVarBody)) + if err != nil { + return localVarHTTPResponse, err + } + + if localVarHTTPResponse.StatusCode >= 300 { + newErr := &GenericOpenAPIError{ + body: localVarBody, + error: localVarHTTPResponse.Status, + } + return localVarHTTPResponse, newErr + } + + return localVarHTTPResponse, nil +} + type ApiRunTestRequest struct { ctx context.Context ApiService *ApiApiService diff --git a/cli/openapi/model_otlp_test_connection_response.go b/cli/openapi/model_otlp_test_connection_response.go new file mode 100644 index 0000000000..f8420e3d71 --- /dev/null +++ b/cli/openapi/model_otlp_test_connection_response.go @@ -0,0 +1,160 @@ +/* +TraceTest + +OpenAPI definition for TraceTest endpoint and resources + +API version: 0.2.1 +*/ + +// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. + +package openapi + +import ( + "encoding/json" +) + +// checks if the OTLPTestConnectionResponse type satisfies the MappedNullable interface at compile time +var _ MappedNullable = &OTLPTestConnectionResponse{} + +// OTLPTestConnectionResponse struct for OTLPTestConnectionResponse +type OTLPTestConnectionResponse struct { + SpanCount *int32 `json:"spanCount,omitempty"` + LastSpanTimestamp *string `json:"lastSpanTimestamp,omitempty"` +} + +// NewOTLPTestConnectionResponse instantiates a new OTLPTestConnectionResponse object +// This constructor will assign default values to properties that have it defined, +// and makes sure properties required by API are set, but the set of arguments +// will change when the set of required properties is changed +func NewOTLPTestConnectionResponse() *OTLPTestConnectionResponse { + this := OTLPTestConnectionResponse{} + return &this +} + +// NewOTLPTestConnectionResponseWithDefaults instantiates a new OTLPTestConnectionResponse object +// This constructor will only assign default values to properties that have it defined, +// but it doesn't guarantee that properties required by API are set +func NewOTLPTestConnectionResponseWithDefaults() *OTLPTestConnectionResponse { + this := OTLPTestConnectionResponse{} + return &this +} + +// GetSpanCount returns the SpanCount field value if set, zero value otherwise. +func (o *OTLPTestConnectionResponse) GetSpanCount() int32 { + if o == nil || isNil(o.SpanCount) { + var ret int32 + return ret + } + return *o.SpanCount +} + +// GetSpanCountOk returns a tuple with the SpanCount field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *OTLPTestConnectionResponse) GetSpanCountOk() (*int32, bool) { + if o == nil || isNil(o.SpanCount) { + return nil, false + } + return o.SpanCount, true +} + +// HasSpanCount returns a boolean if a field has been set. +func (o *OTLPTestConnectionResponse) HasSpanCount() bool { + if o != nil && !isNil(o.SpanCount) { + return true + } + + return false +} + +// SetSpanCount gets a reference to the given int32 and assigns it to the SpanCount field. +func (o *OTLPTestConnectionResponse) SetSpanCount(v int32) { + o.SpanCount = &v +} + +// GetLastSpanTimestamp returns the LastSpanTimestamp field value if set, zero value otherwise. +func (o *OTLPTestConnectionResponse) GetLastSpanTimestamp() string { + if o == nil || isNil(o.LastSpanTimestamp) { + var ret string + return ret + } + return *o.LastSpanTimestamp +} + +// GetLastSpanTimestampOk returns a tuple with the LastSpanTimestamp field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *OTLPTestConnectionResponse) GetLastSpanTimestampOk() (*string, bool) { + if o == nil || isNil(o.LastSpanTimestamp) { + return nil, false + } + return o.LastSpanTimestamp, true +} + +// HasLastSpanTimestamp returns a boolean if a field has been set. +func (o *OTLPTestConnectionResponse) HasLastSpanTimestamp() bool { + if o != nil && !isNil(o.LastSpanTimestamp) { + return true + } + + return false +} + +// SetLastSpanTimestamp gets a reference to the given string and assigns it to the LastSpanTimestamp field. +func (o *OTLPTestConnectionResponse) SetLastSpanTimestamp(v string) { + o.LastSpanTimestamp = &v +} + +func (o OTLPTestConnectionResponse) MarshalJSON() ([]byte, error) { + toSerialize, err := o.ToMap() + if err != nil { + return []byte{}, err + } + return json.Marshal(toSerialize) +} + +func (o OTLPTestConnectionResponse) ToMap() (map[string]interface{}, error) { + toSerialize := map[string]interface{}{} + if !isNil(o.SpanCount) { + toSerialize["spanCount"] = o.SpanCount + } + if !isNil(o.LastSpanTimestamp) { + toSerialize["lastSpanTimestamp"] = o.LastSpanTimestamp + } + return toSerialize, nil +} + +type NullableOTLPTestConnectionResponse struct { + value *OTLPTestConnectionResponse + isSet bool +} + +func (v NullableOTLPTestConnectionResponse) Get() *OTLPTestConnectionResponse { + return v.value +} + +func (v *NullableOTLPTestConnectionResponse) Set(val *OTLPTestConnectionResponse) { + v.value = val + v.isSet = true +} + +func (v NullableOTLPTestConnectionResponse) IsSet() bool { + return v.isSet +} + +func (v *NullableOTLPTestConnectionResponse) Unset() { + v.value = nil + v.isSet = false +} + +func NewNullableOTLPTestConnectionResponse(val *OTLPTestConnectionResponse) *NullableOTLPTestConnectionResponse { + return &NullableOTLPTestConnectionResponse{value: val, isSet: true} +} + +func (v NullableOTLPTestConnectionResponse) MarshalJSON() ([]byte, error) { + return json.Marshal(v.value) +} + +func (v *NullableOTLPTestConnectionResponse) UnmarshalJSON(src []byte) error { + v.isSet = true + return json.Unmarshal(src, &v.value) +} diff --git a/server/app/app.go b/server/app/app.go index 3a04fc74e7..7c25f08c96 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -244,7 +244,7 @@ func (app *App) Start(opts ...appOption) error { if app.cfg.OtlpServerEnabled() { eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager) - registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, tracer) + registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, subscriptionManager, tracer) } executorDriverFactory := pipeline.NewDriverFactory[executor.Job](natsConn) @@ -306,6 +306,8 @@ func (app *App) Start(opts ...appOption) error { provisioner := provisioning.New() + otlpConnectionTester := testconnection.NewOTLPConnectionTester(subscriptionManager) + router, mappers := controller(app.cfg, tracer, meter, @@ -320,6 +322,7 @@ func (app *App) Start(opts ...appOption) error { testRepo, runRepo, variableSetRepo, + otlpConnectionTester, tracedbFactory, ) registerWSHandler(router, mappers, subscriptionManager) @@ -387,8 +390,16 @@ func registerSPAHandler(router *mux.Router, cfg httpServerConfig, analyticsEnabl ) } -func registerOtlpServer(app *App, tracesRepo *traces.TraceRepository, runRepository test.RunRepository, eventEmitter executor.EventEmitter, dsRepo *datastore.Repository, tracer trace.Tracer) { - ingester := otlp.NewIngester(tracesRepo, runRepository, eventEmitter, dsRepo, tracer) +func registerOtlpServer( + app *App, + tracesRepo *traces.TraceRepository, + runRepository test.RunRepository, + eventEmitter executor.EventEmitter, + dsRepo *datastore.Repository, + subManager subscription.Manager, + tracer trace.Tracer, +) { + ingester := otlp.NewIngester(tracesRepo, runRepository, eventEmitter, dsRepo, subManager, tracer) grpcOtlpServer := otlp.NewGrpcServer(":4317", ingester, tracer) httpOtlpServer := otlp.NewHttpServer(":4318", ingester) go grpcOtlpServer.Start() @@ -553,6 +564,7 @@ func controller( testRepo test.Repository, testRunRepo test.RunRepository, variablesetRepo *variableset.Repository, + otlpConnectionTester *testconnection.OTLPConnectionTester, tracedbFactory tracedb.FactoryFunc, ) (*mux.Router, mappings.Mappings) { mappers := mappings.New(tracesConversionConfig(), comparator.DefaultRegistry()) @@ -573,6 +585,7 @@ func controller( testRepo, testRunRepo, variablesetRepo, + otlpConnectionTester, tracedbFactory, mappers, @@ -597,6 +610,7 @@ func httpRouter( testRepo test.Repository, testRunRepo test.RunRepository, variableSetRepo *variableset.Repository, + otlpConnectionTester *testconnection.OTLPConnectionTester, tracedbFactory tracedb.FactoryFunc, mappers mappings.Mappings, @@ -614,6 +628,7 @@ func httpRouter( testRepo, testRunRepo, variableSetRepo, + otlpConnectionTester, tracedbFactory, mappers, diff --git a/server/http/controller.go b/server/http/controller.go index 6ab8228ce3..5e20873ba9 100644 --- a/server/http/controller.go +++ b/server/http/controller.go @@ -48,6 +48,8 @@ type controller struct { newTraceDBFn func(ds datastore.DataStore) (tracedb.TraceDB, error) mappers mappings.Mappings version string + + otlpConnectionTester *testconnection.OTLPConnectionTester } type testSuiteRepository interface { @@ -109,6 +111,8 @@ func NewController( testRunRepository test.RunRepository, variableSetGetter variableSetGetter, + otlpConnectionTester *testconnection.OTLPConnectionTester, + newTraceDBFn func(ds datastore.DataStore) (tracedb.TraceDB, error), mappers mappings.Mappings, version string, @@ -123,8 +127,9 @@ func NewController( dsTestPipeline: dsTestPipeline, - testRunner: testRunner, - transactionRunner: transactionRunner, + testRunner: testRunner, + transactionRunner: transactionRunner, + otlpConnectionTester: otlpConnectionTester, tracer: tracer, newTraceDBFn: newTraceDBFn, @@ -717,6 +722,29 @@ func (c *controller) TestConnection(ctx context.Context, dataStore openapi.DataS return openapi.Response(http.StatusOK, c.mappers.Out.ConnectionTestResult(job.TestResult)), nil } +// GetOTLPConnectionInformation implements openapi.ApiApiServicer. +func (c *controller) GetOTLPConnectionInformation(ctx context.Context) (openapi.ImplResponse, error) { + response, err := c.otlpConnectionTester.GetSpanCount(ctx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return openapi.Response(http.StatusRequestTimeout, nil), err + } + + return openapi.Response(http.StatusInternalServerError, nil), err + } + + return openapi.Response(http.StatusOK, openapi.OtlpTestConnectionResponse{ + SpanCount: int32(response.NumberSpans), + LastSpanTimestamp: response.LastSpanTimestamp.String(), + }), nil +} + +// ResetOTLPConnectionInformation implements openapi.ApiApiServicer. +func (c *controller) ResetOTLPConnectionInformation(ctx context.Context) (openapi.ImplResponse, error) { + c.otlpConnectionTester.ResetSpanCount(ctx) + return openapi.Response(http.StatusOK, nil), nil +} + func (c *controller) GetVersion(ctx context.Context, fileExtension string) (openapi.ImplResponse, error) { version := openapi.Version{ Version: c.version, diff --git a/server/http/controller_test.go b/server/http/controller_test.go index d67017e96d..8d8cd45fc8 100644 --- a/server/http/controller_test.go +++ b/server/http/controller_test.go @@ -133,6 +133,7 @@ func setupController(t *testing.T) controllerFixture { runRepo, nil, nil, + nil, mappings.New(traces.NewConversionConfig(), comparator.DefaultRegistry()), "unit-test", ), diff --git a/server/openapi/api.go b/server/openapi/api.go index 23a6a59748..1085f8df1d 100644 --- a/server/openapi/api.go +++ b/server/openapi/api.go @@ -23,6 +23,7 @@ type ApiApiRouter interface { DryRunAssertion(http.ResponseWriter, *http.Request) ExportTestRun(http.ResponseWriter, *http.Request) ExpressionResolve(http.ResponseWriter, *http.Request) + GetOTLPConnectionInformation(http.ResponseWriter, *http.Request) GetResources(http.ResponseWriter, *http.Request) GetRunResultJUnit(http.ResponseWriter, *http.Request) GetTestResultSelectedSpans(http.ResponseWriter, *http.Request) @@ -37,6 +38,7 @@ type ApiApiRouter interface { GetVersion(http.ResponseWriter, *http.Request) ImportTestRun(http.ResponseWriter, *http.Request) RerunTestRun(http.ResponseWriter, *http.Request) + ResetOTLPConnectionInformation(http.ResponseWriter, *http.Request) RunTest(http.ResponseWriter, *http.Request) RunTestSuite(http.ResponseWriter, *http.Request) SkipTraceCollection(http.ResponseWriter, *http.Request) @@ -96,6 +98,7 @@ type ApiApiServicer interface { DryRunAssertion(context.Context, string, int32, TestSpecs) (ImplResponse, error) ExportTestRun(context.Context, string, int32) (ImplResponse, error) ExpressionResolve(context.Context, ResolveRequestInfo) (ImplResponse, error) + GetOTLPConnectionInformation(context.Context) (ImplResponse, error) GetResources(context.Context, int32, int32, string, string, string) (ImplResponse, error) GetRunResultJUnit(context.Context, string, int32) (ImplResponse, error) GetTestResultSelectedSpans(context.Context, string, int32, string) (ImplResponse, error) @@ -110,6 +113,7 @@ type ApiApiServicer interface { GetVersion(context.Context, string) (ImplResponse, error) ImportTestRun(context.Context, ExportedTestInformation) (ImplResponse, error) RerunTestRun(context.Context, string, int32) (ImplResponse, error) + ResetOTLPConnectionInformation(context.Context) (ImplResponse, error) RunTest(context.Context, string, RunInformation) (ImplResponse, error) RunTestSuite(context.Context, string, RunInformation) (ImplResponse, error) SkipTraceCollection(context.Context, string, int32) (ImplResponse, error) diff --git a/server/openapi/api_api.go b/server/openapi/api_api.go index 9dc7b73a60..45acbb0fac 100644 --- a/server/openapi/api_api.go +++ b/server/openapi/api_api.go @@ -80,6 +80,12 @@ func (c *ApiApiController) Routes() Routes { "/api/expressions/resolve", c.ExpressionResolve, }, + { + "GetOTLPConnectionInformation", + strings.ToUpper("Get"), + "/api/config/connection/otlp", + c.GetOTLPConnectionInformation, + }, { "GetResources", strings.ToUpper("Get"), @@ -164,6 +170,12 @@ func (c *ApiApiController) Routes() Routes { "/api/tests/{testId}/run/{runId}/rerun", c.RerunTestRun, }, + { + "ResetOTLPConnectionInformation", + strings.ToUpper("Post"), + "/api/config/connection/otlp/reset", + c.ResetOTLPConnectionInformation, + }, { "RunTest", strings.ToUpper("Post"), @@ -326,6 +338,19 @@ func (c *ApiApiController) ExpressionResolve(w http.ResponseWriter, r *http.Requ } +// GetOTLPConnectionInformation - get information about the OTLP connection +func (c *ApiApiController) GetOTLPConnectionInformation(w http.ResponseWriter, r *http.Request) { + result, err := c.service.GetOTLPConnectionInformation(r.Context()) + // If an error occurred, encode the error with the status code + if err != nil { + c.errorHandler(w, r, err, &result) + return + } + // If no error, encode the body and the result code + EncodeJSONResponse(result.Body, &result.Code, w) + +} + // GetResources - Get resources func (c *ApiApiController) GetResources(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() @@ -641,6 +666,19 @@ func (c *ApiApiController) RerunTestRun(w http.ResponseWriter, r *http.Request) } +// ResetOTLPConnectionInformation - reset the OTLP connection span count +func (c *ApiApiController) ResetOTLPConnectionInformation(w http.ResponseWriter, r *http.Request) { + result, err := c.service.ResetOTLPConnectionInformation(r.Context()) + // If an error occurred, encode the error with the status code + if err != nil { + c.errorHandler(w, r, err, &result) + return + } + // If no error, encode the body and the result code + EncodeJSONResponse(result.Body, &result.Code, w) + +} + // RunTest - run test func (c *ApiApiController) RunTest(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) diff --git a/server/openapi/model_otlp_test_connection_response.go b/server/openapi/model_otlp_test_connection_response.go new file mode 100644 index 0000000000..012c287fc0 --- /dev/null +++ b/server/openapi/model_otlp_test_connection_response.go @@ -0,0 +1,33 @@ +/* + * TraceTest + * + * OpenAPI definition for TraceTest endpoint and resources + * + * API version: 0.2.1 + * Generated by: OpenAPI Generator (https://openapi-generator.tech) + */ + +package openapi + +type OtlpTestConnectionResponse struct { + SpanCount int32 `json:"spanCount,omitempty"` + + LastSpanTimestamp string `json:"lastSpanTimestamp,omitempty"` +} + +// AssertOtlpTestConnectionResponseRequired checks if the required fields are not zero-ed +func AssertOtlpTestConnectionResponseRequired(obj OtlpTestConnectionResponse) error { + return nil +} + +// AssertRecurseOtlpTestConnectionResponseRequired recursively checks if required fields are not zero-ed in a nested slice. +// Accepts only nested slice of OtlpTestConnectionResponse (e.g. [][]OtlpTestConnectionResponse), otherwise ErrTypeAssertionError is thrown. +func AssertRecurseOtlpTestConnectionResponseRequired(objSlice interface{}) error { + return AssertRecurseInterfaceRequired(objSlice, func(obj interface{}) error { + aOtlpTestConnectionResponse, ok := obj.(OtlpTestConnectionResponse) + if !ok { + return ErrTypeAssertionError + } + return AssertOtlpTestConnectionResponseRequired(aOtlpTestConnectionResponse) + }) +} diff --git a/server/otlp/ingester.go b/server/otlp/ingester.go index 987300de1b..2c8b2ce84a 100644 --- a/server/otlp/ingester.go +++ b/server/otlp/ingester.go @@ -7,11 +7,15 @@ import ( "errors" "fmt" "log" + "sync" + "time" "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/executor" "github.com/kubeshop/tracetest/server/model/events" + "github.com/kubeshop/tracetest/server/subscription" "github.com/kubeshop/tracetest/server/test" + "github.com/kubeshop/tracetest/server/testconnection" "github.com/kubeshop/tracetest/server/traces" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -38,8 +42,15 @@ type tracePersister interface { UpdateTraceSpans(context.Context, *traces.Trace) error } -func NewIngester(tracePersister tracePersister, runRepository runGetter, eventEmitter executor.EventEmitter, dsRepo *datastore.Repository, tracer trace.Tracer) ingester { - return ingester{ +func NewIngester( + tracePersister tracePersister, + runRepository runGetter, + eventEmitter executor.EventEmitter, + dsRepo *datastore.Repository, + subManager subscription.Manager, + tracer trace.Tracer, +) *ingester { + ingester := ingester{ log: func(format string, args ...interface{}) { log.Printf("[OTLP] "+format, args...) }, @@ -47,22 +58,56 @@ func NewIngester(tracePersister tracePersister, runRepository runGetter, eventEm runGetter: runRepository, eventEmitter: eventEmitter, dsRepo: dsRepo, + subManager: subManager, tracer: tracer, } + + ingester.startTesterListener() + + return &ingester } type ingester struct { + mutex sync.Mutex log func(string, ...interface{}) tracePersister tracePersister runGetter runGetter eventEmitter executor.EventEmitter dsRepo *datastore.Repository + subManager subscription.Manager tracer trace.Tracer + + spanCount int + lastSpanTimestamp time.Time } -func (i ingester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest, requestType RequestType) (*pb.ExportTraceServiceResponse, error) { - ds, err := i.dsRepo.Current(ctx) +func (i *ingester) startTesterListener() { + i.subManager.Subscribe(testconnection.GetSpanCountTopicName(), subscription.NewSubscriberFunction(func(m subscription.Message) error { + i.mutex.Lock() + defer i.mutex.Unlock() + + var response testconnection.OTLPConnectionTestResponse + response.NumberSpans = i.spanCount + response.LastSpanTimestamp = i.lastSpanTimestamp + + i.subManager.Publish(testconnection.PostSpanCountTopicName(), response) + return nil + })) + + i.subManager.Subscribe(testconnection.ResetSpanCountTopicName(), subscription.NewSubscriberFunction(func(m subscription.Message) error { + i.mutex.Lock() + defer i.mutex.Unlock() + + i.lastSpanTimestamp = time.Time{} + i.spanCount = 0 + return nil + })) +} +func (i *ingester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest, requestType RequestType) (*pb.ExportTraceServiceResponse, error) { + i.increaseSpanCount(countSpans(request.ResourceSpans)) + + ds, err := i.dsRepo.Current(ctx) if err != nil || !ds.IsOTLPBasedProvider() { i.log("OTLP server is not enabled. Ignoring request") return &pb.ExportTraceServiceResponse{}, nil @@ -95,7 +140,26 @@ func (i ingester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequ }, nil } -func (i ingester) processTrace(ctx context.Context, modelTrace traces.Trace, ix int, requestType RequestType) error { +func (i *ingester) increaseSpanCount(newSpansCount int) { + i.mutex.Lock() + defer i.mutex.Unlock() + + i.spanCount += newSpansCount + i.lastSpanTimestamp = time.Now() +} + +func countSpans(resourceSpans []*v1.ResourceSpans) int { + count := 0 + for _, resourceSpan := range resourceSpans { + for _, scopeSpan := range resourceSpan.ScopeSpans { + count += len(scopeSpan.Spans) + } + } + + return count +} + +func (i *ingester) processTrace(ctx context.Context, modelTrace traces.Trace, ix int, requestType RequestType) error { ctx, span := i.tracer.Start(ctx, "process otlp trace") span.SetAttributes( attribute.String("tracetest.ingestor.trace_id", modelTrace.ID.String()), @@ -133,7 +197,7 @@ func (i ingester) processTrace(ctx context.Context, modelTrace traces.Trace, ix return nil } -func (i ingester) traces(input []*v1.ResourceSpans) []traces.Trace { +func (i *ingester) traces(input []*v1.ResourceSpans) []traces.Trace { spansByTrace := map[string][]*v1.Span{} for _, rs := range input { @@ -162,7 +226,7 @@ func (i ingester) traces(input []*v1.ResourceSpans) []traces.Trace { var errNoTestRun = errors.New("no test run") -func (i ingester) getOngoinTestRunForTrace(ctx context.Context, trace traces.Trace) (test.Run, error) { +func (i *ingester) getOngoinTestRunForTrace(ctx context.Context, trace traces.Trace) (test.Run, error) { run, err := i.runGetter.GetRunByTraceID(ctx, trace.ID) if errors.Is(err, sql.ErrNoRows) { // trace is not part of any known test run, no need to notify @@ -180,7 +244,7 @@ func (i ingester) getOngoinTestRunForTrace(ctx context.Context, trace traces.Tra return run, nil } -func (i ingester) notify(ctx context.Context, run test.Run, trace traces.Trace, requestType RequestType) error { +func (i *ingester) notify(ctx context.Context, run test.Run, trace traces.Trace, requestType RequestType) error { evt := events.TraceOtlpServerReceivedSpans( run.TestID, run.ID, diff --git a/server/subscription/message.go b/server/subscription/message.go index 0a11271cae..451cf61870 100644 --- a/server/subscription/message.go +++ b/server/subscription/message.go @@ -49,4 +49,3 @@ func DecodeMessage(data []byte) (Message, error) { return m, nil } - diff --git a/server/testconnection/otlp.go b/server/testconnection/otlp.go new file mode 100644 index 0000000000..6211af3414 --- /dev/null +++ b/server/testconnection/otlp.go @@ -0,0 +1,124 @@ +package testconnection + +import ( + "context" + "fmt" + "time" + + "github.com/kubeshop/tracetest/server/http/middleware" + "github.com/kubeshop/tracetest/server/subscription" + "golang.org/x/sync/semaphore" +) + +const defaultTimeout = 30 * time.Second + +type TopicNameOption func(*topicNameConfig) + +type topicNameConfig struct { + TenantID string +} + +func WithTenantID(tenantID string) TopicNameOption { + return func(tnc *topicNameConfig) { + tnc.TenantID = tenantID + } +} + +func GetSpanCountTopicName(opts ...TopicNameOption) string { + var config topicNameConfig + for _, opt := range opts { + opt(&config) + } + + return fmt.Sprintf("otlp_connection_test_get_span_count_%s", config.TenantID) +} + +func PostSpanCountTopicName(opts ...TopicNameOption) string { + var config topicNameConfig + for _, opt := range opts { + opt(&config) + } + + return fmt.Sprintf("otlp_connection_test_span_count_%s", config.TenantID) +} + +func ResetSpanCountTopicName(opts ...TopicNameOption) string { + var config topicNameConfig + for _, opt := range opts { + opt(&config) + } + + return fmt.Sprintf("otlp_connection_test_reset_span_count_%s", config.TenantID) +} + +type OTLPConnectionTester struct { + subscriptionManager subscription.Manager +} + +type OTLPConnectionTestRequest struct{} + +type OTLPConnectionTestResponse struct { + TenantID string + + NumberSpans int + LastSpanTimestamp time.Time +} + +func NewOTLPConnectionTester(subscriptionManager subscription.Manager) *OTLPConnectionTester { + return &OTLPConnectionTester{subscriptionManager: subscriptionManager} +} + +type GetSpanCountOption func(*getSpanCountConfig) + +type getSpanCountConfig struct { + timeout time.Duration +} + +func WithTimeout(timeout time.Duration) GetSpanCountOption { + return func(gscc *getSpanCountConfig) { + gscc.timeout = timeout + } +} + +func (t *OTLPConnectionTester) GetSpanCount(ctx context.Context, opts ...GetSpanCountOption) (OTLPConnectionTestResponse, error) { + config := getSpanCountConfig{ + timeout: defaultTimeout, + } + + for _, opt := range opts { + opt(&config) + } + + ctx, cancel := context.WithTimeout(ctx, config.timeout) + defer cancel() + + semaphore := semaphore.NewWeighted(1) + tenantID := middleware.TenantIDFromContext(ctx) + semaphore.Acquire(ctx, 1) + + var response OTLPConnectionTestResponse + topicName := PostSpanCountTopicName(WithTenantID(tenantID)) + subscriber := subscription.NewSubscriberFunction(func(m subscription.Message) error { + m.DecodeContent(&response) + semaphore.Release(1) + return nil + }) + + t.subscriptionManager.Subscribe(topicName, subscriber) + defer t.subscriptionManager.Unsubscribe(topicName, subscriber.ID()) + + t.subscriptionManager.Publish(GetSpanCountTopicName(WithTenantID(tenantID)), OTLPConnectionTestRequest{}) + + // Acts as a WaitGroup.Wait() that is canceled with the context in case of timeout. + err := semaphore.Acquire(ctx, 1) + if err != nil { + return OTLPConnectionTestResponse{}, fmt.Errorf("could not get span count: %w", err) + } + + return response, nil +} + +func (t *OTLPConnectionTester) ResetSpanCount(ctx context.Context) { + tenantID := middleware.TenantIDFromContext(ctx) + t.subscriptionManager.Publish(ResetSpanCountTopicName(WithTenantID(tenantID)), nil) +} diff --git a/server/testconnection/otlp_test.go b/server/testconnection/otlp_test.go new file mode 100644 index 0000000000..5a9d501705 --- /dev/null +++ b/server/testconnection/otlp_test.go @@ -0,0 +1,68 @@ +package testconnection_test + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/kubeshop/tracetest/server/http/middleware" + "github.com/kubeshop/tracetest/server/subscription" + "github.com/kubeshop/tracetest/server/testconnection" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSuccessfulGetSpanCount(t *testing.T) { + manager := subscription.NewManager() + tester := testconnection.NewOTLPConnectionTester(manager) + + go func() { + time.Sleep(time.Second) + manager.Publish(testconnection.PostSpanCountTopicName(), testconnection.OTLPConnectionTestResponse{ + NumberSpans: 2, + LastSpanTimestamp: time.Now(), + }) + }() + + response, err := tester.GetSpanCount(context.Background(), testconnection.WithTimeout(10*time.Second)) + require.NoError(t, err) + + assert.Equal(t, 2, response.NumberSpans) + assert.False(t, response.LastSpanTimestamp.IsZero()) +} + +func TestGetSpanCountTimeout(t *testing.T) { + // Given that GetSpanCount is called but the "PostSpanCountTopicName" topic never receives a message + // the GetSpanCount call should timeout + + manager := subscription.NewManager() + tester := testconnection.NewOTLPConnectionTester(manager) + + _, err := tester.GetSpanCount(context.Background(), testconnection.WithTimeout(1*time.Second)) + require.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} + +func TestSuccessfulGetSpanCountWithTenantID(t *testing.T) { + tenantID := uuid.NewString() + ctx := context.WithValue(context.Background(), middleware.TenantIDKey, tenantID) + manager := subscription.NewManager() + tester := testconnection.NewOTLPConnectionTester(manager) + + go func() { + time.Sleep(time.Second) + manager.Publish(testconnection.PostSpanCountTopicName(testconnection.WithTenantID(tenantID)), testconnection.OTLPConnectionTestResponse{ + TenantID: tenantID, + NumberSpans: 2, + LastSpanTimestamp: time.Now(), + }) + }() + + response, err := tester.GetSpanCount(ctx, testconnection.WithTimeout(10*time.Second)) + require.NoError(t, err) + + assert.Equal(t, tenantID, response.TenantID) + assert.Equal(t, 2, response.NumberSpans) + assert.False(t, response.LastSpanTimestamp.IsZero()) +} diff --git a/web/src/types/Generated.types.ts b/web/src/types/Generated.types.ts index a3a6bb3bd8..824c1ed591 100644 --- a/web/src/types/Generated.types.ts +++ b/web/src/types/Generated.types.ts @@ -120,6 +120,14 @@ export interface paths { /** Tests the config data store/exporter connection */ post: operations["testConnection"]; }; + "/config/connection/otlp": { + /** get information about the OTLP connection */ + get: operations["getOTLPConnectionInformation"]; + }; + "/config/connection/otlp/reset": { + /** reset the OTLP connection span count */ + post: operations["resetOTLPConnectionInformation"]; + }; "/configs": { /** List Tracetest configuration */ get: operations["listConfiguration"]; @@ -715,6 +723,26 @@ export interface operations { }; }; }; + /** get information about the OTLP connection */ + getOTLPConnectionInformation: { + responses: { + /** The connection information was retrieved successfully */ + 200: { + content: { + "application/json": external["config.yaml"]["components"]["schemas"]["OTLPTestConnectionResponse"]; + }; + }; + /** The connection information was not available and the connection timed out */ + 408: unknown; + }; + }; + /** reset the OTLP connection span count */ + resetOTLPConnectionInformation: { + responses: { + /** Ok */ + 200: unknown; + }; + }; /** List Tracetest configuration */ listConfiguration: { parameters: {}; @@ -1301,6 +1329,10 @@ export interface external { count?: number; items?: external["config.yaml"]["components"]["schemas"]["Demo"][]; }; + OTLPTestConnectionResponse: { + spanCount?: number; + lastSpanTimestamp?: string; + }; }; }; operations: {};