From b45a600db8dfd9eed627a90d39ef179b2610007a Mon Sep 17 00:00:00 2001 From: Sebastian Choren Date: Mon, 8 Jan 2024 16:01:01 -0300 Subject: [PATCH] chore(agent): remove dead duplicated code (#3498) --- agent/workers/datastores/awsxray.go | 751 ------------------ agent/workers/datastores/azureappinsights.go | 374 --------- .../datastores/connection/connection.go | 47 -- .../connection/connectivity_step.go | 61 -- .../workers/datastores/connection/options.go | 47 -- .../datastores/connection/port_linting.go | 92 --- .../connection/port_linting_test.go | 58 -- agent/workers/datastores/connection/tester.go | 59 -- .../connection/trace_polling_step.go | 40 - .../datastores/datasource/datasource.go | 61 -- agent/workers/datastores/datasource/grpc.go | 134 ---- agent/workers/datastores/datasource/http.go | 147 ---- agent/workers/datastores/datastores.go | 113 --- agent/workers/datastores/datastores_test.go | 143 ---- agent/workers/datastores/elasticsearchdb.go | 298 ------- agent/workers/datastores/jaegerdb.go | 121 --- agent/workers/datastores/opensearchdb.go | 188 ----- agent/workers/datastores/signalfxdb.go | 215 ----- agent/workers/datastores/sumologicdb.go | 315 -------- agent/workers/datastores/tempodb.go | 166 ---- 20 files changed, 3430 deletions(-) delete mode 100644 agent/workers/datastores/awsxray.go delete mode 100644 agent/workers/datastores/azureappinsights.go delete mode 100644 agent/workers/datastores/connection/connection.go delete mode 100644 agent/workers/datastores/connection/connectivity_step.go delete mode 100644 agent/workers/datastores/connection/options.go delete mode 100644 agent/workers/datastores/connection/port_linting.go delete mode 100644 agent/workers/datastores/connection/port_linting_test.go delete mode 100644 agent/workers/datastores/connection/tester.go delete mode 100644 agent/workers/datastores/connection/trace_polling_step.go delete mode 100644 agent/workers/datastores/datasource/datasource.go delete mode 100644 agent/workers/datastores/datasource/grpc.go delete mode 100644 agent/workers/datastores/datasource/http.go delete mode 100644 agent/workers/datastores/datastores.go delete mode 100644 agent/workers/datastores/datastores_test.go delete mode 100644 agent/workers/datastores/elasticsearchdb.go delete mode 100644 agent/workers/datastores/jaegerdb.go delete mode 100644 agent/workers/datastores/opensearchdb.go delete mode 100644 agent/workers/datastores/signalfxdb.go delete mode 100644 agent/workers/datastores/sumologicdb.go delete mode 100644 agent/workers/datastores/tempodb.go diff --git a/agent/workers/datastores/awsxray.go b/agent/workers/datastores/awsxray.go deleted file mode 100644 index e2040b9b41..0000000000 --- a/agent/workers/datastores/awsxray.go +++ /dev/null @@ -1,751 +0,0 @@ -package datastores - -import ( - "context" - "crypto/rand" - "encoding/binary" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "regexp" - "strconv" - "strings" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/xray" - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/traces" - conventions "go.opentelemetry.io/collector/semconv/v1.6.1" - "go.opentelemetry.io/otel/trace" -) - -type awsxrayDB struct { - realDataStore - - credentials *credentials.Credentials - session *session.Session - region string - service *xray.XRay - useDefaultAuth bool -} - -func NewAwsXRayDB(cfg *datastore.AWSXRayConfig) (DataStore, error) { - sessionCredentials := credentials.NewStaticCredentials(cfg.AccessKeyID, cfg.SecretAccessKey, cfg.SessionToken) - - return &awsxrayDB{ - credentials: sessionCredentials, - region: cfg.Region, - useDefaultAuth: cfg.UseDefaultAuth, - }, nil -} - -func (db *awsxrayDB) GetTraceID() trace.TraceID { - var r [16]byte - epoch := time.Now().Unix() - binary.BigEndian.PutUint32(r[0:4], uint32(epoch)) - _, err := rand.Read(r[4:]) - if err != nil { - panic(err) - } - - return trace.TraceID(r) -} - -func (db *awsxrayDB) Connect(ctx context.Context) error { - awsConfig := &aws.Config{} - - if db.useDefaultAuth { - awsConfig = aws.NewConfig().WithRegion(db.region) - } else { - awsConfig = &aws.Config{ - Region: &db.region, - Credentials: db.credentials, - } - } - sess, err := session.NewSession(awsConfig) - - if err != nil { - return err - } - - db.service = xray.New(sess) - db.session = sess - - return nil -} - -func (db *awsxrayDB) Ready() bool { - return db.service != nil -} - -func (db *awsxrayDB) Close() error { - // Doesn't need to be closed - return nil -} - -func (db *awsxrayDB) GetEndpoints() string { - return fmt.Sprintf("xray.%s.amazonaws.com:443", db.region) -} - -func (db *awsxrayDB) TestConnection(ctx context.Context) model.ConnectionResult { - url := fmt.Sprintf("xray.%s.amazonaws.com:443", db.region) - tester := connection.NewTester( - connection.WithConnectivityTest(connection.ConnectivityStep(model.ProtocolHTTP, url)), - connection.WithPollingTest(connection.TracePollingTestStep(db)), - connection.WithAuthenticationTest(connection.NewTestStep(func(ctx context.Context) (string, error) { - _, err := db.GetTraceByID(ctx, db.GetTraceID().String()) - if err != nil && strings.Contains(strings.ToLower(err.Error()), "403") { - return `Tracetest tried to execute an AWS XRay API request but it failed due to authentication issues`, err - } - - return "Tracetest managed to authenticate with the AWS Services", nil - })), - ) - - return tester.TestConnection(ctx) -} - -func (db *awsxrayDB) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - hexTraceID, err := trace.TraceIDFromHex(traceID) - if err != nil { - return traces.Trace{}, err - } - - parsedTraceID, err := convertToAmazonTraceID(hexTraceID) - if err != nil { - return traces.Trace{}, err - } - - res, err := db.service.BatchGetTraces(&xray.BatchGetTracesInput{ - TraceIds: []*string{&parsedTraceID}, - }) - - if err != nil { - return traces.Trace{}, err - } - - if len(res.Traces) == 0 { - return traces.Trace{}, connection.ErrTraceNotFound - } - - return parseXRayTrace(traceID, res.Traces[0]) -} - -func parseXRayTrace(traceID string, rawTrace *xray.Trace) (traces.Trace, error) { - if len(rawTrace.Segments) == 0 { - return traces.Trace{}, nil - } - - spans := []traces.Span{} - - for _, segment := range rawTrace.Segments { - newSpans, err := parseSegmentToSpans([]byte(*segment.Document), traceID) - - if err != nil { - return traces.Trace{}, err - } - - spans = append(spans, newSpans...) - } - - return traces.NewTrace(traceID, spans), nil -} - -func parseSegmentToSpans(rawSeg []byte, traceID string) ([]traces.Span, error) { - var seg segment - err := json.Unmarshal(rawSeg, &seg) - if err != nil { - return []traces.Span{}, err - } - - err = seg.Validate() - if err != nil { - return []traces.Span{}, err - } - - return segToSpans(seg, traceID, nil) -} - -func segToSpans(seg segment, traceID string, parent *traces.Span) ([]traces.Span, error) { - span, err := generateSpan(&seg, parent) - if err != nil { - return []traces.Span{}, err - } - - spans := []traces.Span{span} - - for _, s := range seg.Subsegments { - nestedSpans, err := segToSpans(s, traceID, &span) - - if err != nil { - return spans, err - } - - spans = append(spans, nestedSpans...) - } - - return spans, nil -} - -func generateSpan(seg *segment, parent *traces.Span) (traces.Span, error) { - attributes := traces.NewAttributes() - span := traces.Span{ - Parent: parent, - Name: *seg.Name, - } - - if seg.ParentID != nil { - parentID, err := decodeXRaySpanID(seg.ParentID) - if err != nil { - return span, err - } - - attributes.Set(traces.TracetestMetadataFieldParentID, parentID.String()) - } else if parent != nil { - attributes.Set(traces.TracetestMetadataFieldParentID, parent.ID.String()) - } - - // decode span id - spanID, err := decodeXRaySpanID(seg.ID) - if err != nil { - return span, err - } - span.ID = spanID - - err = addNamespace(seg, attributes) - if err != nil { - return traces.Span{}, err - } - - span.StartTime = floatSecToTime(seg.StartTime) - if seg.EndTime != nil { - span.EndTime = floatSecToTime(seg.EndTime) - } - - if seg.InProgress != nil { - attributes.Set(AWSXRayInProgressAttribute, strconv.FormatBool(*seg.InProgress)) - } - - attributes.SetPointerValue(conventions.AttributeEnduserID, seg.User) - addHTTP(seg, attributes) - addAWSToSpan(seg.AWS, attributes) - err = addSQLToSpan(seg.SQL, attributes) - if err != nil { - return traces.Span{}, err - } - - if seg.Traced != nil { - attributes.Set(AWSXRayTracedAttribute, strconv.FormatBool(*seg.Traced)) - } - - addAnnotations(seg.Annotations, attributes) - addMetadata(seg.Metadata, attributes) - - // this generates an event that we don't support yet - // addCause(seg, span) - - span.Attributes = attributes - return span, nil -} - -const ( - validAWSNamespace = "aws" - validRemoteNamespace = "remote" -) - -func addNamespace(seg *segment, attributes traces.Attributes) error { - if seg.Namespace != nil { - switch *seg.Namespace { - case validAWSNamespace: - attributes.SetPointerValue(AWSServiceAttribute, seg.Name) - - case validRemoteNamespace: - // no op - default: - return fmt.Errorf("unexpected namespace: %s", *seg.Namespace) - } - return nil - } - - return nil -} - -func addHTTP(seg *segment, attributes traces.Attributes) { - if seg.HTTP == nil { - return - } - - if req := seg.HTTP.Request; req != nil { - attributes.SetPointerValue(conventions.AttributeHTTPMethod, req.Method) - attributes.SetPointerValue(conventions.AttributeHTTPClientIP, req.ClientIP) - attributes.SetPointerValue(conventions.AttributeHTTPUserAgent, req.UserAgent) - attributes.SetPointerValue(conventions.AttributeHTTPURL, req.URL) - - if req.XForwardedFor != nil { - attributes.Set(AWSXRayXForwardedForAttribute, strconv.FormatBool(*req.XForwardedFor)) - } - } - - if resp := seg.HTTP.Response; resp != nil { - if resp.status != nil { - attributes.Set(conventions.AttributeHTTPStatusCode, fmt.Sprintf("%v", *resp.status)) - } - - switch val := resp.contentLength.(type) { - case string: - attributes.Set(conventions.AttributeHTTPResponseContentLength, val) - case float64: - lengthPointer := int64(val) - attributes.Set(conventions.AttributeHTTPResponseContentLength, fmt.Sprintf("%v", lengthPointer)) - } - } -} - -func addAWSToSpan(aws *aWSData, attrs traces.Attributes) { - if aws != nil { - attrs.SetPointerValue(AWSAccountAttribute, aws.AccountID) - attrs.SetPointerValue(AWSOperationAttribute, aws.Operation) - attrs.SetPointerValue(AWSRegionAttribute, aws.RemoteRegion) - attrs.SetPointerValue(AWSRequestIDAttribute, aws.RequestID) - attrs.SetPointerValue(AWSQueueURLAttribute, aws.QueueURL) - attrs.SetPointerValue(AWSTableNameAttribute, aws.TableName) - - if aws.Retries != nil { - attrs.Set(AWSXrayRetriesAttribute, fmt.Sprintf("%v", *aws.Retries)) - } - } -} - -func addSQLToSpan(sql *sQLData, attrs traces.Attributes) error { - if sql == nil { - return nil - } - - if sql.URL != nil { - dbURL, dbName, err := splitSQLURL(*sql.ConnectionString) - if err != nil { - return err - } - - attrs.Set(conventions.AttributeDBConnectionString, dbURL) - attrs.Set(conventions.AttributeDBName, dbName) - } - // not handling sql.ConnectionString for now because the X-Ray exporter - // does not support it - attrs.SetPointerValue(conventions.AttributeDBSystem, sql.DatabaseType) - attrs.SetPointerValue(conventions.AttributeDBStatement, sql.SanitizedQuery) - attrs.SetPointerValue(conventions.AttributeDBUser, sql.User) - return nil -} - -func addAnnotations(annos map[string]interface{}, attrs traces.Attributes) { - if len(annos) > 0 { - for k, v := range annos { - switch t := v.(type) { - case int: - attrs.Set(k, fmt.Sprintf("%v", t)) - case int32: - attrs.Set(k, fmt.Sprintf("%v", t)) - case int64: - attrs.Set(k, fmt.Sprintf("%v", t)) - case string: - attrs.Set(k, t) - case bool: - attrs.Set(k, strconv.FormatBool(t)) - case float32: - attrs.Set(k, fmt.Sprintf("%v", t)) - case float64: - attrs.Set(k, fmt.Sprintf("%v", t)) - default: - } - } - } -} - -func addMetadata(meta map[string]map[string]interface{}, attrs traces.Attributes) error { - for k, v := range meta { - val, err := json.Marshal(v) - if err != nil { - return err - } - attrs.Set(AWSXraySegmentMetadataAttributePrefix+k, string(val)) - } - return nil -} - -// SQL URL is of the format: protocol+transport://host:port/dbName?queryParam -var re = regexp.MustCompile(`^(.+\/\/.+)\/([^\?]+)\??.*$`) - -const ( - dbURLI = 1 - dbNameI = 2 -) - -func splitSQLURL(rawURL string) (string, string, error) { - m := re.FindStringSubmatch(rawURL) - if len(m) == 0 { - return "", "", fmt.Errorf( - "failed to parse out the database name in the \"sql.url\" field, rawUrl: %s", - rawURL, - ) - } - return m[dbURLI], m[dbNameI], nil -} - -func floatSecToTime(epochSec *float64) time.Time { - timestamp := (*epochSec) * float64(time.Second) - return time.Unix(0, int64(timestamp)).UTC() -} - -const ( - traceIDLength = 35 // fixed length of aws trace id - identifierOffset = 11 // offset of identifier within traceID -) - -func convertToAmazonTraceID(traceID trace.TraceID) (string, error) { - const ( - maxAge = 60 * 60 * 24 * 28 - maxSkew = 60 * 5 - ) - - var ( - content = [traceIDLength]byte{} - epochNow = time.Now().Unix() - traceIDBytes = traceID - epoch = int64(binary.BigEndian.Uint32(traceIDBytes[0:4])) - b = [4]byte{} - ) - - delta := epochNow - epoch - if delta > maxAge || delta < -maxSkew { - return "", fmt.Errorf("invalid xray traceid: %s", traceID) - } - - binary.BigEndian.PutUint32(b[0:4], uint32(epoch)) - - content[0] = '1' - content[1] = '-' - hex.Encode(content[2:10], b[0:4]) - content[10] = '-' - hex.Encode(content[identifierOffset:], traceIDBytes[4:16]) // overwrite with identifier - - return string(content[0:traceIDLength]), nil -} - -func decodeXRaySpanID(spanID *string) (trace.SpanID, error) { - sid := [8]byte{} - if spanID == nil { - return sid, errors.New("spanid is null") - } - if len(*spanID) != 16 { - return sid, errors.New("spanID length is wrong") - } - _, err := hex.Decode(sid[:], []byte(*spanID)) - return sid, err -} - -const ( - // TypeStr is the type and ingest format of this receiver - TypeStr = "awsxray" -) - -type CauseType int - -const ( - CauseTypeExceptionID CauseType = iota + 1 - CauseTypeObject -) - -type segment struct { - // Required fields for both segment and subsegments - Name *string `json:"name"` - ID *string `json:"id"` - StartTime *float64 `json:"start_time"` - - // Segment-only optional fields - Service *serviceData `json:"service,omitempty"` - Origin *string `json:"origin,omitempty"` - User *string `json:"user,omitempty"` - ResourceARN *string `json:"resource_arn,omitempty"` - - // Optional fields for both Segment and subsegments - TraceID *string `json:"trace_id,omitempty"` - EndTime *float64 `json:"end_time,omitempty"` - InProgress *bool `json:"in_progress,omitempty"` - HTTP *hTTPData `json:"http,omitempty"` - Fault *bool `json:"fault,omitempty"` - Error *bool `json:"error,omitempty"` - Throttle *bool `json:"throttle,omitempty"` - Cause *causeData `json:"cause,omitempty"` - AWS *aWSData `json:"aws,omitempty"` - Annotations map[string]interface{} `json:"annotations,omitempty"` - Metadata map[string]map[string]interface{} `json:"metadata,omitempty"` - Subsegments []segment `json:"subsegments,omitempty"` - - // (for both embedded and independent) subsegment-only (optional) fields. - // Please refer to https://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html#api-segmentdocuments-subsegments - // for more information on subsegment. - Namespace *string `json:"namespace,omitempty"` - ParentID *string `json:"parent_id,omitempty"` - Type *string `json:"type,omitempty"` - PrecursorIDs []string `json:"precursor_ids,omitempty"` - Traced *bool `json:"traced,omitempty"` - SQL *sQLData `json:"sql,omitempty"` -} - -// Validate checks whether the segment is valid or not -func (s *segment) Validate() error { - if s.Name == nil { - return errors.New(`segment "name" can not be nil`) - } - - if s.ID == nil { - return errors.New(`segment "id" can not be nil`) - } - - if s.StartTime == nil { - return errors.New(`segment "start_time" can not be nil`) - } - - // it's ok for embedded subsegments to not have trace_id - // but the root segment and independent subsegments must all - // have trace_id. - if s.TraceID == nil { - return errors.New(`segment "trace_id" can not be nil`) - } - - return nil -} - -type aWSData struct { - // Segment-only - Beanstalk *beanstalkMetadata `json:"elastic_beanstalk,omitempty"` - CWLogs []logGroupMetadata `json:"cloudwatch_logs,omitempty"` - ECS *eCSMetadata `json:"ecs,omitempty"` - EC2 *eC2Metadata `json:"ec2,omitempty"` - EKS *eKSMetadata `json:"eks,omitempty"` - XRay *xRayMetaData `json:"xray,omitempty"` - - // For both segment and subsegments - AccountID *string `json:"account_id,omitempty"` - Operation *string `json:"operation,omitempty"` - RemoteRegion *string `json:"region,omitempty"` - RequestID *string `json:"request_id,omitempty"` - QueueURL *string `json:"queue_url,omitempty"` - TableName *string `json:"table_name,omitempty"` - Retries *int64 `json:"retries,omitempty"` -} - -type eC2Metadata struct { - InstanceID *string `json:"instance_id"` - AvailabilityZone *string `json:"availability_zone"` - InstanceSize *string `json:"instance_size"` - AmiID *string `json:"ami_id"` -} - -type eCSMetadata struct { - ContainerName *string `json:"container,omitempty"` - ContainerID *string `json:"container_id,omitempty"` - TaskArn *string `json:"task_arn,omitempty"` - TaskFamily *string `json:"task_family,omitempty"` - ClusterArn *string `json:"cluster_arn,omitempty"` - ContainerArn *string `json:"container_arn,omitempty"` - AvailabilityZone *string `json:"availability_zone,omitempty"` - LaunchType *string `json:"launch_type,omitempty"` -} - -// BeanstalkMetadata represents the Elastic Beanstalk environment metadata field -type beanstalkMetadata struct { - Environment *string `json:"environment_name"` - VersionLabel *string `json:"version_label"` - DeploymentID *int64 `json:"deployment_id"` -} - -// EKSMetadata represents the EKS metadata field -type eKSMetadata struct { - ClusterName *string `json:"cluster_name"` - Pod *string `json:"pod"` - ContainerID *string `json:"container_id"` -} - -// LogGroupMetadata represents a single CloudWatch Log Group -type logGroupMetadata struct { - LogGroup *string `json:"log_group"` - Arn *string `json:"arn,omitempty"` -} - -// CauseData is the container that contains the `cause` field -type causeData struct { - Type CauseType `json:"-"` - // it will contain one of ExceptionID or (WorkingDirectory, Paths, Exceptions) - ExceptionID *string `json:"-"` - - causeObject -} - -type causeObject struct { - WorkingDirectory *string `json:"working_directory,omitempty"` - Paths []string `json:"paths,omitempty"` - Exceptions []exception `json:"exceptions,omitempty"` -} - -// UnmarshalJSON is the custom unmarshaller for the cause field -func (c *causeData) UnmarshalJSON(data []byte) error { - err := json.Unmarshal(data, &c.causeObject) - if err == nil { - c.Type = CauseTypeObject - return nil - } - rawStr := string(data) - if len(rawStr) > 0 && (rawStr[0] != '"' || rawStr[len(rawStr)-1] != '"') { - return fmt.Errorf("the value assigned to the `cause` field does not appear to be a string: %v", data) - } - exceptionID := rawStr[1 : len(rawStr)-1] - - c.Type = CauseTypeExceptionID - c.ExceptionID = &exceptionID - return nil -} - -// Exception represents an exception occurred -type exception struct { - ID *string `json:"id,omitempty"` - Message *string `json:"message,omitempty"` - Type *string `json:"type,omitempty"` - Remote *bool `json:"remote,omitempty"` - Truncated *int64 `json:"truncated,omitempty"` - Skipped *int64 `json:"skipped,omitempty"` - Cause *string `json:"cause,omitempty"` - Stack []stackFrame `json:"stack,omitempty"` -} - -// StackFrame represents a frame in the stack when an exception occurred -type stackFrame struct { - Path *string `json:"path,omitempty"` - Line *int `json:"line,omitempty"` - Label *string `json:"label,omitempty"` -} - -// HTTPData provides the shape for unmarshalling request and response fields. -type hTTPData struct { - Request *requestData `json:"request,omitempty"` - Response *responseData `json:"response,omitempty"` -} - -// RequestData provides the shape for unmarshalling the request field. -type requestData struct { - // Available in segment - XForwardedFor *bool `json:"x_forwarded_for,omitempty"` - - // Available in both segment and subsegments - Method *string `json:"method,omitempty"` - URL *string `json:"url,omitempty"` - UserAgent *string `json:"user_agent,omitempty"` - ClientIP *string `json:"client_ip,omitempty"` -} - -// ResponseData provides the shape for unmarshalling the response field. -type responseData struct { - status *int64 `json:"status,omitempty"` - contentLength interface{} `json:"content_length,omitempty"` -} - -// ECSData provides the shape for unmarshalling the ecs field. -type eCSData struct { - Container *string `json:"container"` -} - -// EC2Data provides the shape for unmarshalling the ec2 field. -type eC2Data struct { - InstanceID *string `json:"instance_id"` - AvailabilityZone *string `json:"availability_zone"` -} - -// ElasticBeanstalkData provides the shape for unmarshalling the elastic_beanstalk field. -type elasticBeanstalkData struct { - EnvironmentName *string `json:"environment_name"` - VersionLabel *string `json:"version_label"` - DeploymentID *int `json:"deployment_id"` -} - -// XRayMetaData provides the shape for unmarshalling the xray field -type xRayMetaData struct { - SDK *string `json:"sdk,omitempty"` - SDKVersion *string `json:"sdk_version,omitempty"` - AutoInstrumentation *bool `json:"auto_instrumentation"` -} - -// SQLData provides the shape for unmarshalling the sql field. -type sQLData struct { - ConnectionString *string `json:"connection_string,omitempty"` - URL *string `json:"url,omitempty"` // protocol://host[:port]/database - SanitizedQuery *string `json:"sanitized_query,omitempty"` - DatabaseType *string `json:"database_type,omitempty"` - DatabaseVersion *string `json:"database_version,omitempty"` - DriverVersion *string `json:"driver_version,omitempty"` - User *string `json:"user,omitempty"` - Preparation *string `json:"preparation,omitempty"` // "statement" / "call" -} - -// ServiceData provides the shape for unmarshalling the service field. -type serviceData struct { - Version *string `json:"version,omitempty"` - CompilerVersion *string `json:"compiler_version,omitempty"` - Compiler *string `json:"compiler,omitempty"` -} - -const ( - AWSOperationAttribute = "aws.operation" - AWSAccountAttribute = "aws.account_id" - AWSRegionAttribute = "aws.region" - AWSRequestIDAttribute = "aws.request_id" - // Currently different instrumentation uses different tag formats. - // TODO(anuraaga): Find current instrumentation and consolidate. - AWSRequestIDAttribute2 = "aws.requestId" - AWSQueueURLAttribute = "aws.queue_url" - AWSQueueURLAttribute2 = "aws.queue.url" - AWSServiceAttribute = "aws.service" - AWSTableNameAttribute = "aws.table_name" - AWSTableNameAttribute2 = "aws.table.name" - - // AWSXRayInProgressAttribute is the `in_progress` flag in an X-Ray segment - AWSXRayInProgressAttribute = "aws.xray.inprogress" - - // AWSXRayXForwardedForAttribute is the `x_forwarded_for` flag in an X-Ray segment - AWSXRayXForwardedForAttribute = "aws.xray.x_forwarded_for" - - // AWSXRayResourceARNAttribute is the `resource_arn` field in an X-Ray segment - AWSXRayResourceARNAttribute = "aws.xray.resource_arn" - - // AWSXRayTracedAttribute is the `traced` field in an X-Ray subsegment - AWSXRayTracedAttribute = "aws.xray.traced" - - // AWSXraySegmentAnnotationsAttribute is the attribute that - // will be treated by the X-Ray exporter as the annotation keys. - AWSXraySegmentAnnotationsAttribute = "aws.xray.annotations" - - // AWSXraySegmentMetadataAttributePrefix is the prefix of the attribute that - // will be treated by the X-Ray exporter as metadata. The key of a metadata - // will be AWSXraySegmentMetadataAttributePrefix + . - AWSXraySegmentMetadataAttributePrefix = "aws.xray.metadata." - - // AWSXrayRetriesAttribute is the `retries` field in an X-Ray (sub)segment. - AWSXrayRetriesAttribute = "aws.xray.retries" - - // AWSXrayExceptionIDAttribute is the `id` field in an exception - AWSXrayExceptionIDAttribute = "aws.xray.exception.id" - // AWSXrayExceptionRemoteAttribute is the `remote` field in an exception - AWSXrayExceptionRemoteAttribute = "aws.xray.exception.remote" - // AWSXrayExceptionTruncatedAttribute is the `truncated` field in an exception - AWSXrayExceptionTruncatedAttribute = "aws.xray.exception.truncated" - // AWSXrayExceptionSkippedAttribute is the `skipped` field in an exception - AWSXrayExceptionSkippedAttribute = "aws.xray.exception.skipped" - // AWSXrayExceptionCauseAttribute is the `cause` field in an exception - AWSXrayExceptionCauseAttribute = "aws.xray.exception.cause" -) diff --git a/agent/workers/datastores/azureappinsights.go b/agent/workers/datastores/azureappinsights.go deleted file mode 100644 index 244344d520..0000000000 --- a/agent/workers/datastores/azureappinsights.go +++ /dev/null @@ -1,374 +0,0 @@ -package datastores - -import ( - "context" - "encoding/json" - "fmt" - "strings" - "time" - - "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" - "github.com/Azure/azure-sdk-for-go/sdk/azidentity" - "github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery" - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/traces" - "go.opentelemetry.io/otel/trace" -) - -var ( - azureLogUrl = "https://api.loganalytics.azure.com" -) - -type azureAppInsightsDB struct { - realDataStore - - resourceArmId string - credentials azcore.TokenCredential - client *azquery.LogsClient -} - -var _ DataStore = &azureAppInsightsDB{} - -func NewAzureAppInsightsDB(config *datastore.AzureAppInsightsConfig) (DataStore, error) { - var credentials azcore.TokenCredential - var err error - if config.UseAzureActiveDirectoryAuth { - credentials, err = azidentity.NewDefaultAzureCredential(nil) - if err != nil { - return nil, err - } - } else { - creds := []azcore.TokenCredential{ - &tokenCredentials{accessToken: config.AccessToken}, - } - - credentials, err = azidentity.NewChainedTokenCredential(creds, nil) - if err != nil { - return nil, err - } - } - - return &azureAppInsightsDB{ - resourceArmId: config.ResourceArmId, - credentials: credentials, - }, nil -} - -func (db *azureAppInsightsDB) Connect(ctx context.Context) error { - client, err := azquery.NewLogsClient(db.credentials, nil) - if err != nil { - return err - } - - db.client = client - return nil -} - -func (db *azureAppInsightsDB) Close() error { - return nil -} - -func (db *azureAppInsightsDB) Ready() bool { - return db.credentials != nil && db.client != nil -} - -func (db *azureAppInsightsDB) GetEndpoints() string { - return azureLogUrl -} - -func (db *azureAppInsightsDB) TestConnection(ctx context.Context) model.ConnectionResult { - url := azureLogUrl - tester := connection.NewTester( - connection.WithConnectivityTest(connection.ConnectivityStep(model.ProtocolHTTP, url)), - connection.WithPollingTest(connection.TracePollingTestStep(db)), - connection.WithAuthenticationTest(connection.NewTestStep(func(ctx context.Context) (string, error) { - _, err := db.GetTraceByID(ctx, db.GetTraceID().String()) - if err != nil && strings.Contains(strings.ToLower(err.Error()), "403") { - return `Tracetest tried to execute an Azure API request but it failed due to authentication issues`, err - } - - return "Tracetest managed to authenticate with the Azure Services", nil - })), - ) - - return tester.TestConnection(ctx) -} - -func (db *azureAppInsightsDB) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - query := fmt.Sprintf("union * | where operation_Id == '%s'", traceID) - body := azquery.Body{ - Query: &query, - } - - res, err := db.client.QueryResource(ctx, db.resourceArmId, body, nil) - if err != nil { - return traces.Trace{}, err - } - - table := res.Tables[0] - if len(table.Rows) == 0 { - return traces.Trace{}, connection.ErrTraceNotFound - } - - return parseAzureAppInsightsTrace(traceID, table) -} - -type spanTable struct { - rows []spanRow -} - -func (st *spanTable) Spans() []spanRow { - output := make([]spanRow, 0) - for _, row := range st.rows { - if row.Type() != "trace" { - output = append(output, row) - } - } - - return output -} - -func (st *spanTable) Events() []spanRow { - output := make([]spanRow, 0) - for _, row := range st.rows { - if row.Type() == "trace" { - output = append(output, row) - } - } - - return output -} - -type spanRow struct { - values map[string]any -} - -func (sr *spanRow) Get(name string) any { - return sr.values[name] -} - -func (sr *spanRow) Type() string { - return sr.values["itemType"].(string) -} - -func (sr *spanRow) ParentID() string { - return sr.values["operation_ParentId"].(string) -} - -func (sr *spanRow) SpanID() string { - return sr.values["id"].(string) -} - -func newSpanTable(table *azquery.Table) spanTable { - spanRows := make([]spanRow, 0, len(table.Rows)) - for _, row := range table.Rows { - spanRows = append(spanRows, newSpanRow(row, table.Columns)) - } - - return spanTable{spanRows} -} - -func newSpanRow(row azquery.Row, columns []*azquery.Column) spanRow { - values := make(map[string]any) - for i, column := range columns { - name := *column.Name - if value := row[i]; value != nil { - values[name] = value - } - } - - return spanRow{values} -} - -func parseAzureAppInsightsTrace(traceID string, table *azquery.Table) (traces.Trace, error) { - spans, err := parseSpans(table) - if err != nil { - return traces.Trace{}, err - } - - return traces.NewTrace(traceID, spans), nil -} - -func parseSpans(table *azquery.Table) ([]traces.Span, error) { - spanTable := newSpanTable(table) - spanRows := spanTable.Spans() - eventRows := spanTable.Events() - - spanEventsMap := make(map[string][]spanRow) - for _, eventRow := range eventRows { - spanEventsMap[eventRow.ParentID()] = append(spanEventsMap[eventRow.ParentID()], eventRow) - } - - spanMap := make(map[string]*traces.Span) - for _, spanRow := range spanRows { - span, err := parseRowToSpan(spanRow) - if err != nil { - return []traces.Span{}, err - } - - spanMap[span.ID.String()] = &span - } - - for _, eventRow := range eventRows { - parentSpan := spanMap[eventRow.ParentID()] - event, err := parseEvent(eventRow) - if err != nil { - return []traces.Span{}, err - } - - parentSpan.Events = append(parentSpan.Events, event) - } - - spans := make([]traces.Span, 0, len(spanMap)) - for _, span := range spanMap { - spans = append(spans, *span) - } - - return spans, nil -} - -func parseEvent(row spanRow) (traces.SpanEvent, error) { - event := traces.SpanEvent{ - Name: row.Get("message").(string), - } - - timestamp, err := time.Parse(time.RFC3339Nano, row.Get("timestamp").(string)) - if err != nil { - return event, fmt.Errorf("could not parse event timestamp: %w", err) - } - - event.Timestamp = timestamp - - attributes := traces.NewAttributes() - rawAttributes := row.Get("customDimensions").(string) - err = json.Unmarshal([]byte(rawAttributes), &attributes) - if err != nil { - return event, fmt.Errorf("could not unmarshal event attributes: %w", err) - } - - event.Attributes = attributes - - return event, nil -} - -func parseRowToSpan(row spanRow) (traces.Span, error) { - attributes := traces.NewAttributes() - span := traces.Span{ - Attributes: attributes, - } - var duration time.Duration - - for name, value := range row.values { - switch name { - case "id": - err := parseSpanID(&span, value) - if err != nil { - return span, err - } - case "customDimensions": - err := parseAttributes(&span, value) - if err != nil { - return span, err - } - case "operation_ParentId": - err := parseParentID(&span, value) - if err != nil { - return span, err - } - case "name": - err := parseName(&span, value) - if err != nil { - return span, err - } - case "timestamp": - err := parseStartTime(&span, value) - if err != nil { - return span, err - } - case "duration": - timeDuration, err := parseDuration(value) - if err != nil { - return span, err - } - - duration = timeDuration - } - } - - span.EndTime = span.StartTime.Add(duration) - return span, nil -} - -func parseSpanID(span *traces.Span, value any) error { - spanID, err := trace.SpanIDFromHex(value.(string)) - if err != nil { - return fmt.Errorf("failed to parse spanId: %w", err) - } - - span.ID = spanID - return nil -} - -func parseAttributes(span *traces.Span, value any) error { - attributes := traces.NewAttributes() - rawAttributes := value.(string) - err := json.Unmarshal([]byte(rawAttributes), &attributes) - if err != nil { - return fmt.Errorf("failed to parse attributes: %w", err) - } - - for key, value := range attributes.Values() { - span.Attributes.Set(key, value) - } - return nil -} - -func parseParentID(span *traces.Span, value any) error { - rawParentID, ok := value.(string) - if ok { - span.Attributes.Set(traces.TracetestMetadataFieldParentID, rawParentID) - } else { - span.Attributes.Set(traces.TracetestMetadataFieldParentID, "") - } - return nil -} - -func parseName(span *traces.Span, value any) error { - rawName, ok := value.(string) - if ok { - span.Name = rawName - } else { - span.Name = "" - } - return nil -} - -func parseStartTime(span *traces.Span, value any) error { - rawStartTime := value.(string) - startTime, err := time.Parse(time.RFC3339Nano, rawStartTime) - if err != nil { - return fmt.Errorf("failed to parse startTime: %w", err) - } - - span.StartTime = startTime - return nil -} - -func parseDuration(value any) (time.Duration, error) { - rawDuration, ok := value.(float64) - if !ok { - return time.Duration(0), fmt.Errorf("failed to parse duration") - } - return time.Duration(rawDuration), nil -} - -type tokenCredentials struct { - accessToken string -} - -func (c *tokenCredentials) GetToken(ctx context.Context, opts policy.TokenRequestOptions) (azcore.AccessToken, error) { - return azcore.AccessToken{Token: c.accessToken}, nil -} diff --git a/agent/workers/datastores/connection/connection.go b/agent/workers/datastores/connection/connection.go deleted file mode 100644 index a0f797edc6..0000000000 --- a/agent/workers/datastores/connection/connection.go +++ /dev/null @@ -1,47 +0,0 @@ -package connection - -import ( - "errors" - "fmt" - "net" - "net/url" - "strings" - "time" - - "github.com/kubeshop/tracetest/server/model" -) - -const reachabilityTimeout = 5 * time.Second - -var ( - ErrTraceNotFound = errors.New("trace not found") - ErrInvalidConfiguration = errors.New("invalid data store configuration") - ErrConnectionFailed = errors.New("could not connect to data store") -) - -func CheckReachability(endpoint string, protocol model.Protocol) error { - if protocol == model.ProtocolHTTP { - address, err := url.Parse(endpoint) - if err != nil { - return err - } - - endpoint = strings.TrimPrefix(endpoint, "http://") - endpoint = strings.TrimPrefix(endpoint, "https://") - - if address.Scheme == "https" && address.Port() == "" { - endpoint = fmt.Sprintf("%s:443", address.Hostname()) - } - - if address.Scheme == "http" && address.Port() == "" { - endpoint = fmt.Sprintf("%s:80", address.Hostname()) - } - } - - _, err := net.DialTimeout("tcp", endpoint, reachabilityTimeout) - if err != nil { - return err - } - - return nil -} diff --git a/agent/workers/datastores/connection/connectivity_step.go b/agent/workers/datastores/connection/connectivity_step.go deleted file mode 100644 index ca968e7b5c..0000000000 --- a/agent/workers/datastores/connection/connectivity_step.go +++ /dev/null @@ -1,61 +0,0 @@ -package connection - -import ( - "context" - "fmt" - "strings" - - "github.com/hashicorp/go-multierror" - "github.com/kubeshop/tracetest/server/model" -) - -type connectivityTestStep struct { - endpoints []string - protocol model.Protocol -} - -var _ TestStep = &connectivityTestStep{} - -func (s *connectivityTestStep) TestConnection(_ context.Context) model.ConnectionTestStep { - unreachableEndpoints := make([]string, 0) - var connectionErr error - for _, endpoint := range s.endpoints { - err := CheckReachability(endpoint, s.protocol) - if err != nil { - unreachableEndpoints = append(unreachableEndpoints, fmt.Sprintf(`"%s"`, endpoint)) - connectionErr = multierror.Append( - connectionErr, - fmt.Errorf("cannot connect to endpoint '%s': %w", endpoint, err), - ) - } - } - - if len(s.endpoints) == 0 { - return model.ConnectionTestStep{ - Message: "Tracetest tried to connect but no endpoints were provided", - Error: fmt.Errorf("no endpoints provided"), - } - } - - if connectionErr != nil { - endpoints := strings.Join(unreachableEndpoints, ", ") - return model.ConnectionTestStep{ - Message: fmt.Sprintf("Tracetest tried to connect to the following endpoints and failed: %s", endpoints), - Status: model.StatusFailed, - Error: connectionErr, - } - } - - endpoints := strings.Join(s.endpoints, ", ") - return model.ConnectionTestStep{ - Message: fmt.Sprintf(`Tracetest connected to %s`, endpoints), - Status: model.StatusPassed, - } -} - -func ConnectivityStep(protocol model.Protocol, endpoints ...string) TestStep { - return &connectivityTestStep{ - endpoints: endpoints, - protocol: protocol, - } -} diff --git a/agent/workers/datastores/connection/options.go b/agent/workers/datastores/connection/options.go deleted file mode 100644 index 136cbbaef2..0000000000 --- a/agent/workers/datastores/connection/options.go +++ /dev/null @@ -1,47 +0,0 @@ -package connection - -import ( - "context" - - "github.com/kubeshop/tracetest/server/model" -) - -func WithPortLintingTest(step TestStep) TesterOption { - return func(t *Tester) { - t.portLinterStep = step - } -} - -func WithConnectivityTest(step TestStep) TesterOption { - return func(t *Tester) { - t.connectivityTestStep = step - } -} - -func WithAuthenticationTest(step TestStep) TesterOption { - return func(t *Tester) { - t.authenticationTestStep = step - } -} - -func WithPollingTest(step TestStep) TesterOption { - return func(t *Tester) { - t.pollingTestStep = step - } -} - -type functionTestStep struct { - fn func(ctx context.Context) (string, error) -} - -func (s *functionTestStep) TestConnection(ctx context.Context) model.ConnectionTestStep { - str, err := s.fn(ctx) - return model.ConnectionTestStep{ - Message: str, - Error: err, - } -} - -func NewTestStep(f func(ctx context.Context) (string, error)) TestStep { - return &functionTestStep{fn: f} -} diff --git a/agent/workers/datastores/connection/port_linting.go b/agent/workers/datastores/connection/port_linting.go deleted file mode 100644 index 70a8563ef2..0000000000 --- a/agent/workers/datastores/connection/port_linting.go +++ /dev/null @@ -1,92 +0,0 @@ -package connection - -import ( - "context" - "fmt" - "regexp" - "strings" - - "github.com/kubeshop/tracetest/server/model" -) - -type portLinter struct { - dataStoreName string - endpoints []string - expectedPorts []string -} - -var _ TestStep = &portLinter{} - -func PortLinter(dataStoreName string, expectedPorts []string, endpoints ...string) TestStep { - return &portLinter{ - dataStoreName: dataStoreName, - endpoints: endpoints, - expectedPorts: expectedPorts, - } -} - -func (s *portLinter) TestConnection(ctx context.Context) model.ConnectionTestStep { - for _, endpoint := range s.endpoints { - port := parsePort(endpoint) - - if !sliceContains(s.expectedPorts, port) { - suggestedPorts := formatAvailablePortsMessage(s.expectedPorts) - return model.ConnectionTestStep{ - Message: fmt.Sprintf(`For %s, port "%s" is not the default port for accessing traces programmatically. Typically, %s uses port %s. If you continue experiencing issues, you may want to verify the correct port to specify.`, s.dataStoreName, port, s.dataStoreName, suggestedPorts), - Status: model.StatusWarning, - } - } - } - - return model.ConnectionTestStep{ - Message: `You are using a commonly used port`, - Status: model.StatusPassed, - } -} - -func sliceContains(slice []string, value string) bool { - for _, item := range slice { - if item == value { - return true - } - } - - return false -} - -// Generates the ports separated by commas and "or" -// ["123"] => 123 -// ["123", "345"] => 123 or 345 -// ["123", "345", "567"] => 123, 345, or 567 -func formatAvailablePortsMessage(ports []string) string { - if len(ports) == 1 { - return ports[0] - } - - allPortsExceptLast := ports[0 : len(ports)-1] - portsSeparatedByComma := strings.Join(allPortsExceptLast, ", ") - lastPort := ports[len(ports)-1] - - if len(ports) == 2 { - return fmt.Sprintf("%s or %s", portsSeparatedByComma, lastPort) - } - - return fmt.Sprintf("%s, or %s", portsSeparatedByComma, lastPort) -} - -var extractPortRegex = regexp.MustCompile("([0-9]+).*") - -func parsePort(url string) string { - index := strings.LastIndex(url, ":") - if index < 0 { - return "" - } - - substring := url[index+1:] - regexGroups := extractPortRegex.FindStringSubmatch(substring) - if len(regexGroups) < 2 { - return "" - } - - return regexGroups[1] -} diff --git a/agent/workers/datastores/connection/port_linting_test.go b/agent/workers/datastores/connection/port_linting_test.go deleted file mode 100644 index f04eb0de92..0000000000 --- a/agent/workers/datastores/connection/port_linting_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package connection_test - -import ( - "context" - "testing" - - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/stretchr/testify/assert" -) - -func TestPortLinter(t *testing.T) { - testCases := []struct { - Name string - Endpoints []string - ExpectedPorts []string - ExpectedStatus model.Status - }{ - { - Name: "shouldSucceedIfPortIsExpected", - Endpoints: []string{"jaeger:16685"}, - ExpectedPorts: []string{"16685"}, - ExpectedStatus: model.StatusPassed, - }, - { - Name: "shouldShowWarningInCaseOfDifferentPort", - Endpoints: []string{"jaeger:16686"}, - ExpectedPorts: []string{"16685"}, - ExpectedStatus: model.StatusWarning, - }, - { - Name: "shouldSupportSchemas", - Endpoints: []string{"https://us2.endpoint:9200"}, - ExpectedPorts: []string{"9200"}, - ExpectedStatus: model.StatusPassed, - }, - { - Name: "shouldSupportTwoPorts", - Endpoints: []string{"https://us2.endpoint:9100"}, - ExpectedPorts: []string{"9200", "9250"}, - ExpectedStatus: model.StatusWarning, - }, - { - Name: "shouldSupportTwoPorts", - Endpoints: []string{"https://us2.endpoint:9100"}, - ExpectedPorts: []string{"9200", "9250", "9300"}, - ExpectedStatus: model.StatusWarning, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.Name, func(t *testing.T) { - linter := connection.PortLinter("Jaeger", testCase.ExpectedPorts, testCase.Endpoints...) - result := linter.TestConnection(context.Background()) - assert.Equal(t, testCase.ExpectedStatus, result.Status) - }) - } -} diff --git a/agent/workers/datastores/connection/tester.go b/agent/workers/datastores/connection/tester.go deleted file mode 100644 index 80a102fe35..0000000000 --- a/agent/workers/datastores/connection/tester.go +++ /dev/null @@ -1,59 +0,0 @@ -package connection - -import ( - "context" - - "github.com/kubeshop/tracetest/server/model" -) - -type TestStep interface { - TestConnection(ctx context.Context) model.ConnectionTestStep -} - -type TesterOption func(*Tester) - -type Tester struct { - portLinterStep TestStep - connectivityTestStep TestStep - authenticationTestStep TestStep - pollingTestStep TestStep -} - -func NewTester(opts ...TesterOption) Tester { - tester := Tester{} - - for _, opt := range opts { - opt(&tester) - } - - return tester -} - -func (t *Tester) TestConnection(ctx context.Context) (res model.ConnectionResult) { - if t.portLinterStep != nil { - res.PortCheck = t.portLinterStep.TestConnection(ctx) - if res.PortCheck.Error != nil { - res.PortCheck.Status = model.StatusFailed - return - } - } - - res.Connectivity = t.connectivityTestStep.TestConnection(ctx) - if res.Connectivity.Error != nil { - res.Connectivity.Status = model.StatusFailed - return - } - - res.Authentication = t.authenticationTestStep.TestConnection(ctx) - if res.Authentication.Error != nil { - res.Authentication.Status = model.StatusFailed - return - } - - res.FetchTraces = t.pollingTestStep.TestConnection(ctx) - if res.FetchTraces.Error != nil { - res.FetchTraces.Status = model.StatusFailed - } - - return -} diff --git a/agent/workers/datastores/connection/trace_polling_step.go b/agent/workers/datastores/connection/trace_polling_step.go deleted file mode 100644 index 36751cc45f..0000000000 --- a/agent/workers/datastores/connection/trace_polling_step.go +++ /dev/null @@ -1,40 +0,0 @@ -package connection - -import ( - "context" - "errors" - - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/traces" - "go.opentelemetry.io/otel/trace" -) - -type DataStore interface { - GetTraceByID(context.Context, string) (traces.Trace, error) - GetTraceID() trace.TraceID -} - -type tracePollingTestStep struct { - dataStore DataStore -} - -func (s *tracePollingTestStep) TestConnection(ctx context.Context) model.ConnectionTestStep { - _, err := s.dataStore.GetTraceByID(ctx, s.dataStore.GetTraceID().String()) - if !errors.Is(err, ErrTraceNotFound) { - return model.ConnectionTestStep{ - Message: "Tracetest could not get traces back from the data store", - Error: err, - Status: model.StatusFailed, - } - } - - return model.ConnectionTestStep{ - Message: "Traces were obtained successfully", - Error: nil, - Status: model.StatusPassed, - } -} - -func TracePollingTestStep(ds DataStore) TestStep { - return &tracePollingTestStep{ds} -} diff --git a/agent/workers/datastores/datasource/datasource.go b/agent/workers/datastores/datasource/datasource.go deleted file mode 100644 index e6d2143017..0000000000 --- a/agent/workers/datastores/datasource/datasource.go +++ /dev/null @@ -1,61 +0,0 @@ -package datasource - -import ( - "context" - - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/traces" - "google.golang.org/grpc" -) - -type SupportedDataSource string - -var ( - GRPC SupportedDataSource = "grpc" - HTTP SupportedDataSource = "http" -) - -type HttpCallback func(ctx context.Context, traceID string, client *HttpClient) (traces.Trace, error) -type GrpcCallback func(ctx context.Context, traceID string, connection *grpc.ClientConn) (traces.Trace, error) - -type Callbacks struct { - GRPC GrpcCallback - HTTP HttpCallback -} - -type DataSource interface { - Endpoint() string - Connect(ctx context.Context) error - Ready() bool - GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) - TestConnection(ctx context.Context) model.ConnectionTestStep - Close() error -} - -type noopDataSource struct{} - -func (dataSource *noopDataSource) GetTraceByID(ctx context.Context, traceID string) (t traces.Trace, err error) { - return traces.Trace{}, nil -} -func (db *noopDataSource) Endpoint() string { return "" } -func (db *noopDataSource) Connect(ctx context.Context) error { return nil } -func (db *noopDataSource) Close() error { return nil } -func (db *noopDataSource) Ready() bool { return true } -func (db *noopDataSource) TestConnection(ctx context.Context) model.ConnectionTestStep { - return model.ConnectionTestStep{} -} - -func New(name string, cfg *datastore.MultiChannelClientConfig, callbacks Callbacks) DataSource { - sourceType := SupportedDataSource(cfg.Type) - - switch sourceType { - default: - case GRPC: - return NewGrpcClient(name, cfg.Grpc, callbacks.GRPC) - case HTTP: - return NewHttpClient(name, cfg.Http, callbacks.HTTP) - } - - return &noopDataSource{} -} diff --git a/agent/workers/datastores/datasource/grpc.go b/agent/workers/datastores/datasource/grpc.go deleted file mode 100644 index 00981da66c..0000000000 --- a/agent/workers/datastores/datasource/grpc.go +++ /dev/null @@ -1,134 +0,0 @@ -package datasource - -import ( - "context" - "fmt" - - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/traces" - "github.com/pkg/errors" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configcompression" - "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/configopaque" - "go.opentelemetry.io/collector/config/configtls" - "google.golang.org/grpc" -) - -type GrpcClient struct { - name string - config *configgrpc.GRPCClientSettings - conn *grpc.ClientConn - callback GrpcCallback -} - -func convertDomainConfigToOpenTelemetryConfig(config *datastore.GRPCClientSettings) *configgrpc.GRPCClientSettings { - headers := make(map[string]configopaque.String) - for name, value := range config.Headers { - headers[name] = configopaque.String(value) - } - - // manual map domain fields to OTel fields - otelConfig := &configgrpc.GRPCClientSettings{ - Endpoint: config.Endpoint, - ReadBufferSize: config.ReadBufferSize, - WriteBufferSize: config.WriteBufferSize, - WaitForReady: config.WaitForReady, - Headers: headers, - BalancerName: config.BalancerName, - - Compression: configcompression.CompressionType(config.Compression), - } - - if config.TLS == nil { - return otelConfig - } - - otelConfig.TLSSetting = configtls.TLSClientSetting{ - Insecure: config.TLS.Insecure, - InsecureSkipVerify: config.TLS.InsecureSkipVerify, - ServerName: config.TLS.ServerName, - } - - if config.TLS.Settings == nil { - return otelConfig - } - - otelConfig.TLSSetting.TLSSetting = configtls.TLSSetting{ - CAFile: config.TLS.Settings.CAFile, - CertFile: config.TLS.Settings.CertFile, - KeyFile: config.TLS.Settings.KeyFile, - MinVersion: config.TLS.Settings.MinVersion, - MaxVersion: config.TLS.Settings.MaxVersion, - } - - return otelConfig -} - -func NewGrpcClient(name string, config *datastore.GRPCClientSettings, callback GrpcCallback) DataSource { - otelConfig := convertDomainConfigToOpenTelemetryConfig(config) - - return &GrpcClient{ - name: name, - config: otelConfig, - callback: callback, - } -} - -func (client *GrpcClient) Ready() bool { - return client.conn != nil -} - -func (client *GrpcClient) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - return client.callback(ctx, traceID, client.conn) -} - -func (client *GrpcClient) Endpoint() string { - return client.config.Endpoint -} - -func (client *GrpcClient) Connect(ctx context.Context) error { - conn, err := client.config.ToClientConn(ctx, nil, component.TelemetrySettings{}) - if err != nil { - return errors.Wrap(connection.ErrConnectionFailed, err.Error()) - } - - client.conn = conn - return nil -} - -func (client *GrpcClient) TestConnection(ctx context.Context) model.ConnectionTestStep { - connectionTestResult := model.ConnectionTestStep{ - Message: fmt.Sprintf(`Tracetest connected to "%s"`, client.config.Endpoint), - } - - err := connection.CheckReachability(client.config.Endpoint, model.ProtocolGRPC) - if err != nil { - return model.ConnectionTestStep{ - Message: fmt.Sprintf(`Tracetest tried to connect to "%s" and failed`, client.config.Endpoint), - Error: err, - } - } - - err = client.Connect(ctx) - wrappedErr := errors.Unwrap(err) - if errors.Is(wrappedErr, connection.ErrConnectionFailed) { - return model.ConnectionTestStep{ - Message: fmt.Sprintf(`Tracetest tried to open a gRPC connection against "%s" and failed`, client.config.Endpoint), - Error: err, - } - } - - return connectionTestResult -} - -func (client *GrpcClient) Close() error { - err := client.conn.Close() - if err != nil { - return fmt.Errorf("GRPC close: %w", err) - } - - return nil -} diff --git a/agent/workers/datastores/datasource/http.go b/agent/workers/datastores/datasource/http.go deleted file mode 100644 index 0315018af4..0000000000 --- a/agent/workers/datastores/datasource/http.go +++ /dev/null @@ -1,147 +0,0 @@ -package datasource - -import ( - "bytes" - "context" - "crypto/tls" - "crypto/x509" - "errors" - "fmt" - "io" - "net/http" - "strings" - - "github.com/goware/urlx" - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/traces" -) - -type HttpClient struct { - name string - config *http.Request - client *http.Client - callback HttpCallback -} - -func NewHttpClient(name string, config *datastore.HttpClientConfig, callback HttpCallback) DataSource { - endpoint, _ := urlx.Parse(config.Url) - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: getTlsConfig(config.TLS), - }, - } - - header := http.Header{} - for key, value := range config.Headers { - header.Set(key, value) - } - - request := &http.Request{ - URL: endpoint, - Header: header, - } - - return &HttpClient{ - name: name, - config: request, - client: client, - callback: callback, - } -} - -func (client *HttpClient) Ready() bool { - return client.callback != nil -} - -func (client *HttpClient) Close() error { - return nil -} - -func (client *HttpClient) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - return client.callback(ctx, traceID, client) -} - -func (client *HttpClient) Endpoint() string { - return client.config.URL.String() -} - -func (client *HttpClient) Connect(ctx context.Context) error { - _, err := client.client.Transport.RoundTrip(client.config) - - return err -} - -func (client *HttpClient) TestConnection(ctx context.Context) model.ConnectionTestStep { - connectionTestResult := model.ConnectionTestStep{ - Message: fmt.Sprintf(`Tracetest connected to "%s"`, client.config.URL.String()), - } - - err := connection.CheckReachability(client.config.URL.String(), model.ProtocolHTTP) - if err != nil { - return model.ConnectionTestStep{ - Message: fmt.Sprintf(`Tracetest tried to connect to "%s" and failed`, client.config.URL.String()), - Error: err, - } - } - - err = client.Connect(ctx) - wrappedErr := errors.Unwrap(err) - if errors.Is(wrappedErr, connection.ErrConnectionFailed) { - return model.ConnectionTestStep{ - Message: fmt.Sprintf(`Tracetest tried to open a connection against "%s" and failed`, client.config.URL.String()), - Error: err, - } - } - - return connectionTestResult -} - -func (client *HttpClient) Request(ctx context.Context, path, method, body string) (*http.Response, error) { - url := fmt.Sprintf("%s%s", client.config.URL.String(), path) - var readerBody io.Reader - if body != "" { - readerBody = bytes.NewBufferString(body) - } - - request, err := http.NewRequestWithContext(ctx, strings.ToUpper(method), url, readerBody) - if err != nil { - return nil, err - } - - request.Header = client.config.Header - response, err := client.client.Do(request) - if err != nil { - return nil, err - } - - return response, nil -} - -func getTlsConfig(dataStoreTls *datastore.TLS) *tls.Config { - tlsConfig := tls.Config{} - - if dataStoreTls == nil { - return &tlsConfig - } - - if dataStoreTls.Insecure { - tlsConfig.InsecureSkipVerify = true - } - - if dataStoreTls.Settings == nil { - return &tlsConfig - } - - caCertFile := dataStoreTls.Settings.CAFile - - if caCertFile != "" { - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM([]byte(caCertFile)) - tlsConfig.RootCAs = caCertPool - tlsConfig.BuildNameToCertificate() - } - - return &tlsConfig -} diff --git a/agent/workers/datastores/datastores.go b/agent/workers/datastores/datastores.go deleted file mode 100644 index 44a8ca1b89..0000000000 --- a/agent/workers/datastores/datastores.go +++ /dev/null @@ -1,113 +0,0 @@ -package datastores - -import ( - "context" - "fmt" - - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/pkg/id" - "github.com/kubeshop/tracetest/server/traces" - "go.opentelemetry.io/otel/trace" -) - -var IDGen = id.NewRandGenerator() - -type DataStore interface { - Connect(ctx context.Context) error - Ready() bool - ShouldRetry() bool - GetTraceID() trace.TraceID - GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) - Close() error - GetEndpoints() string -} - -type TestableDataStore interface { - DataStore - TestConnection(ctx context.Context) model.ConnectionResult -} - -type noopDataStore struct{} - -func (db *noopDataStore) GetTraceByID(ctx context.Context, traceID string) (t traces.Trace, err error) { - return traces.Trace{}, nil -} - -func (db *noopDataStore) GetTraceID() trace.TraceID { - return IDGen.TraceID() -} -func (db *noopDataStore) Connect(ctx context.Context) error { return nil } -func (db *noopDataStore) Close() error { return nil } -func (db *noopDataStore) ShouldRetry() bool { return false } -func (db *noopDataStore) Ready() bool { return true } -func (db *noopDataStore) GetEndpoints() string { return "" } -func (db *noopDataStore) TestConnection(ctx context.Context) model.ConnectionResult { - return model.ConnectionResult{} -} - -type dataStoreFactory struct{} - -func Factory() func(ds datastore.DataStore) (DataStore, error) { - f := dataStoreFactory{} - - return f.New -} - -func (f *dataStoreFactory) getDatastoreInstance(ds datastore.DataStore) (DataStore, error) { - var tdb DataStore - var err error - - switch ds.Type { - case datastore.DataStoreTypeJaeger: - tdb, err = newJaegerDB(ds.Values.Jaeger) - case datastore.DataStoreTypeTempo: - tdb, err = newTempoDB(ds.Values.Tempo) - case datastore.DataStoreTypeElasticAPM: - tdb, err = newElasticSearchDB(ds.Values.ElasticApm) - case datastore.DataStoreTypeOpenSearch: - tdb, err = newOpenSearchDB(ds.Values.OpenSearch) - case datastore.DataStoreTypeSignalFX: - tdb, err = newSignalFXDB(ds.Values.SignalFx) - case datastore.DataStoreTypeAwsXRay: - tdb, err = NewAwsXRayDB(ds.Values.AwsXRay) - case datastore.DatastoreTypeAzureAppInsights: - tdb, err = NewAzureAppInsightsDB(ds.Values.AzureAppInsights) - case datastore.DatastoreTypeSumoLogic: - tdb, err = NewSumoLogicDB(ds.Values.SumoLogic) - default: - return &noopDataStore{}, nil - } - - if err != nil { - return nil, err - } - - if tdb == nil { - return nil, fmt.Errorf("data store unknown: %s", ds.Type) - } - - return tdb, err -} - -func (f *dataStoreFactory) New(ds datastore.DataStore) (DataStore, error) { - datastore, err := f.getDatastoreInstance(ds) - - if err != nil { - return nil, err - } - - err = datastore.Connect(context.Background()) - if err != nil { - return nil, fmt.Errorf("cannot connect to datasource: %w", err) - } - - return datastore, nil -} - -type realDataStore struct{} - -func (db *realDataStore) ShouldRetry() bool { return true } -func (db *realDataStore) GetTraceID() trace.TraceID { - return IDGen.TraceID() -} diff --git a/agent/workers/datastores/datastores_test.go b/agent/workers/datastores/datastores_test.go deleted file mode 100644 index 4aa819feb2..0000000000 --- a/agent/workers/datastores/datastores_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package datastores_test - -import ( - "fmt" - "testing" - - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/tracedb" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestCreateClient(t *testing.T) { - cases := []struct { - name string - ds datastore.DataStore - expectedType string - expectedError error - }{ - { - name: "Jaeger", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeJaeger, - Values: datastore.DataStoreValues{ - Jaeger: &datastore.GRPCClientSettings{ - Endpoint: "notexists:123", - }, - }, - }, - expectedType: "*tracedb.jaegerTraceDB", - }, - { - name: "Tempo", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeTempo, - Values: datastore.DataStoreValues{ - Tempo: &datastore.MultiChannelClientConfig{}, - }, - }, - expectedType: "*tracedb.tempoTraceDB", - }, - { - name: "ElasticSearch", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeElasticAPM, - Values: datastore.DataStoreValues{ - ElasticApm: &datastore.ElasticSearchConfig{}, - }, - }, - expectedType: "*tracedb.elasticsearchDB", - }, - { - name: "OpenSearch", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeOpenSearch, - Values: datastore.DataStoreValues{ - OpenSearch: &datastore.ElasticSearchConfig{}, - }, - }, - expectedType: "*tracedb.opensearchDB", - }, - { - name: "SignalFX", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeSignalFX, - Values: datastore.DataStoreValues{ - SignalFx: &datastore.SignalFXConfig{}, - }, - }, - expectedType: "*tracedb.signalfxDB", - }, - { - name: "AWSXRay", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeAwsXRay, - Values: datastore.DataStoreValues{ - AwsXRay: &datastore.AWSXRayConfig{}, - }, - }, - expectedType: "*tracedb.awsxrayDB", - }, - { - name: "OTLP", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeOTLP, - Values: datastore.DataStoreValues{}, - }, - expectedType: "*tracedb.OTLPTraceDB", - }, - { - name: "NewRelic", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeNewRelic, - Values: datastore.DataStoreValues{}, - }, - expectedType: "*tracedb.OTLPTraceDB", - }, - { - name: "Lightstep", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeLighStep, - Values: datastore.DataStoreValues{}, - }, - expectedType: "*tracedb.OTLPTraceDB", - }, - { - name: "Honeycomb", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeHoneycomb, - Values: datastore.DataStoreValues{}, - }, - expectedType: "*tracedb.OTLPTraceDB", - }, - { - name: "DataDog", - ds: datastore.DataStore{ - Type: datastore.DataStoreTypeDataDog, - Values: datastore.DataStoreValues{}, - }, - expectedType: "*tracedb.OTLPTraceDB", - }, - { - name: "EmptyConfig", - ds: datastore.DataStore{}, - expectedType: "*tracedb.noopTraceDB", - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - cl := c - t.Parallel() - - newFn := tracedb.Factory(nil) - - actual, err := newFn(cl.ds) - - require.NoError(t, err) - assert.Equal(t, cl.expectedType, fmt.Sprintf("%T", actual)) - }) - } - -} diff --git a/agent/workers/datastores/elasticsearchdb.go b/agent/workers/datastores/elasticsearchdb.go deleted file mode 100644 index 50e7b1f47c..0000000000 --- a/agent/workers/datastores/elasticsearchdb.go +++ /dev/null @@ -1,298 +0,0 @@ -package datastores - -import ( - "context" - "crypto/tls" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "reflect" - "strconv" - "strings" - "time" - - "github.com/elastic/go-elasticsearch/v8" - "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/traces" - "go.opentelemetry.io/otel/trace" -) - -func elasticSearchDefaultPorts() []string { - return []string{"9200"} -} - -type elasticsearchDB struct { - realDataStore - config *datastore.ElasticSearchConfig - client *elasticsearch.Client -} - -func (db *elasticsearchDB) Connect(ctx context.Context) error { - return nil -} - -func (db *elasticsearchDB) Close() error { - // No need to close this db - return nil -} - -func (db *elasticsearchDB) GetEndpoints() string { - return strings.Join(db.config.Addresses, ", ") -} - -func (db *elasticsearchDB) TestConnection(ctx context.Context) model.ConnectionResult { - tester := connection.NewTester( - connection.WithPortLintingTest(connection.PortLinter("ElasticSearch", elasticSearchDefaultPorts(), db.config.Addresses...)), - connection.WithConnectivityTest(connection.ConnectivityStep(model.ProtocolHTTP, db.config.Addresses...)), - connection.WithPollingTest(connection.TracePollingTestStep(db)), - connection.WithAuthenticationTest(connection.NewTestStep(func(ctx context.Context) (string, error) { - _, err := getClusterInfo(db.client) - if err != nil { - return "Tracetest tried to execute an ElasticSearch API request but it failed due to authentication issues", err - } - - return "Tracetest managed to authenticate with ElasticSearch", nil - })), - ) - - return tester.TestConnection(ctx) -} - -func (db *elasticsearchDB) Ready() bool { - return db.client != nil -} - -func (db *elasticsearchDB) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - if !db.Ready() { - return traces.Trace{}, fmt.Errorf("ElasticSearch dataStore not ready") - } - content := strings.NewReader(fmt.Sprintf(`{ - "query": { "match": { "trace.id": "%s" } } - }`, traceID)) - - searchRequest := esapi.SearchRequest{ - Index: []string{db.config.Index}, - Body: content, - Pretty: true, - } - - response, err := searchRequest.Do(ctx, db.client) - if err != nil { - return traces.Trace{}, fmt.Errorf("could not execute search request: %w", err) - } - defer response.Body.Close() - - responseBody, err := ioutil.ReadAll(response.Body) - if err != nil { - return traces.Trace{}, fmt.Errorf("could not read response body") - } - - var searchResponse searchResponse - err = json.Unmarshal(responseBody, &searchResponse) - if err != nil { - return traces.Trace{}, fmt.Errorf("could not unmarshal search response into struct: %w", err) - } - - if len(searchResponse.Hits.Results) == 0 { - return traces.Trace{}, connection.ErrTraceNotFound - } - - return convertElasticSearchFormatIntoTrace(traceID, searchResponse), nil -} - -func newElasticSearchDB(cfg *datastore.ElasticSearchConfig) (DataStore, error) { - var caCert []byte - if cfg.Certificate != "" { - caCert = []byte(cfg.Certificate) - } - - client, err := elasticsearch.NewClient(elasticsearch.Config{ - Addresses: cfg.Addresses, - Username: cfg.Username, - Password: cfg.Password, - CACert: caCert, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: cfg.InsecureSkipVerify, - }, - }, - }) - if err != nil { - return nil, fmt.Errorf("could not create elasticsearch client: %w", err) - } - - return &elasticsearchDB{ - config: cfg, - client: client, - }, nil -} - -func getClusterInfo(client *elasticsearch.Client) (string, error) { - var r map[string]interface{} - res, err := client.Info() - if err != nil { - return "", fmt.Errorf("error getting cluster info response: %s", err) - } - defer res.Body.Close() - - // Check response status - if res.IsError() { - return "", fmt.Errorf("error getting cluster info response status: %s", res.String()) - } - // Deserialize the response into a map - if err := json.NewDecoder(res.Body).Decode(&r); err != nil { - return "", fmt.Errorf("error parsing cluster info response: %s", err) - } - - // Return client version number - info := fmt.Sprintf("Server: %s", r["version"].(map[string]interface{})["number"]) - return info, nil -} - -func convertElasticSearchFormatIntoTrace(traceID string, searchResponse searchResponse) traces.Trace { - spans := make([]traces.Span, 0) - for _, result := range searchResponse.Hits.Results { - span := convertElasticSearchSpanIntoSpan(result.Source) - spans = append(spans, span) - } - - return traces.NewTrace(traceID, spans) -} - -func convertElasticSearchSpanIntoSpan(input map[string]interface{}) traces.Span { - opts := &FlattenOptions{Delimiter: "."} - flatInput, _ := flatten(opts.Prefix, 0, input, opts) - - // SpanId - transactionId := flatInput["transaction.id"] - spanId := flatInput["span.id"] - var id trace.SpanID - if transactionId != nil { - id, _ = trace.SpanIDFromHex((transactionId).(string)) - } - if spanId != nil { - id, _ = trace.SpanIDFromHex((spanId).(string)) - } - - // SpanName - transactionName := flatInput["transaction.name"] - spanName := flatInput["span.name"] - var name string - if transactionName != nil { - name = transactionName.(string) - } - if spanName != nil { - name = spanName.(string) - } - - // Duration - transactionDuration := flatInput["transaction.duration.us"] - spanDuration := flatInput["span.duration.us"] - var duration float64 - if transactionDuration != nil { - duration = transactionDuration.(float64) - } - if spanDuration != nil { - duration = spanDuration.(float64) - } - - // Timestamps - startTime, _ := time.Parse(time.RFC3339, flatInput["@timestamp"].(string)) - endTime := startTime.Add(time.Microsecond * time.Duration(duration)) - - // Attributes - attributes := traces.NewAttributes() - - for attrName, attrValue := range flatInput { - name := attrName - name = strings.ReplaceAll(name, "transaction.", "") - name = strings.ReplaceAll(name, "span.", "") - attributes.Set(name, fmt.Sprintf("%v", attrValue)) - } - - // ParentId - parentId := flatInput["parent.id"] - if parentId != nil { - attributes.Set(traces.TracetestMetadataFieldParentID, flatInput["parent.id"].(string)) - } - - return traces.Span{ - ID: id, - Name: name, - StartTime: startTime, - EndTime: endTime, - Attributes: attributes, - Parent: nil, - Children: []*traces.Span{}, - } -} - -type FlattenOptions struct { - Prefix string - Delimiter string - Safe bool - MaxDepth int -} - -func flatten(prefix string, depth int, nested interface{}, opts *FlattenOptions) (flatmap map[string]interface{}, err error) { - flatmap = make(map[string]interface{}) - - switch nested := nested.(type) { - case map[string]interface{}: - if opts.MaxDepth != 0 && depth >= opts.MaxDepth { - flatmap[prefix] = nested - return - } - if reflect.DeepEqual(nested, map[string]interface{}{}) { - flatmap[prefix] = nested - return - } - for k, v := range nested { - // create new key - newKey := k - if prefix != "" { - newKey = prefix + opts.Delimiter + newKey - } - fm1, fe := flatten(newKey, depth+1, v, opts) - if fe != nil { - err = fe - return - } - update(flatmap, fm1) - } - case []interface{}: - if opts.Safe { - flatmap[prefix] = nested - return - } - if reflect.DeepEqual(nested, []interface{}{}) { - flatmap[prefix] = nested - return - } - for i, v := range nested { - newKey := strconv.Itoa(i) - if prefix != "" { - newKey = prefix + opts.Delimiter + newKey - } - fm1, fe := flatten(newKey, depth+1, v, opts) - if fe != nil { - err = fe - return - } - update(flatmap, fm1) - } - default: - flatmap[prefix] = nested - } - return -} - -func update(to map[string]interface{}, from map[string]interface{}) { - for kt, vt := range from { - to[kt] = vt - } -} diff --git a/agent/workers/datastores/jaegerdb.go b/agent/workers/datastores/jaegerdb.go deleted file mode 100644 index 3920939133..0000000000 --- a/agent/workers/datastores/jaegerdb.go +++ /dev/null @@ -1,121 +0,0 @@ -package datastores - -import ( - "context" - "fmt" - "io" - "strings" - - pb "github.com/kubeshop/tracetest/agent/internal/proto-gen-go/api_v3" - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/pkg/id" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/tracedb/datasource" - "github.com/kubeshop/tracetest/server/traces" - v1 "go.opentelemetry.io/proto/otlp/trace/v1" - "google.golang.org/grpc" - "google.golang.org/grpc/status" -) - -func jaegerDefaultPorts() []string { - return []string{"16685"} -} - -type jaegerTraceDB struct { - realDataStore - dataSource datasource.DataSource -} - -func newJaegerDB(grpcConfig *datastore.GRPCClientSettings) (DataStore, error) { - baseConfig := &datastore.MultiChannelClientConfig{ - Type: datastore.MultiChannelClientTypeGRPC, - Grpc: grpcConfig, - } - - dataSource := datasource.New("Jaeger", baseConfig, datasource.Callbacks{ - GRPC: jaegerGrpcGetTraceByID, - }) - - return &jaegerTraceDB{ - dataSource: dataSource, - }, nil -} - -func (jtd *jaegerTraceDB) Connect(ctx context.Context) error { - return jtd.dataSource.Connect(ctx) -} - -func (jtd *jaegerTraceDB) GetEndpoints() string { - return jtd.dataSource.Endpoint() -} - -func (jtd *jaegerTraceDB) TestConnection(ctx context.Context) model.ConnectionResult { - tester := connection.NewTester( - connection.WithPortLintingTest(connection.PortLinter("Jaeger", jaegerDefaultPorts(), jtd.dataSource.Endpoint())), - connection.WithConnectivityTest(jtd.dataSource), - connection.WithPollingTest(connection.TracePollingTestStep(jtd)), - connection.WithAuthenticationTest(connection.NewTestStep(func(ctx context.Context) (string, error) { - _, err := jtd.GetTraceByID(ctx, id.NewRandGenerator().TraceID().String()) - if strings.Contains(err.Error(), "authentication handshake failed") { - return "Tracetest tried to execute a gRPC request but it failed due to authentication issues", err - } - - return "Tracetest managed to authenticate with Jaeger", nil - })), - ) - - return tester.TestConnection(ctx) -} - -func (jtd *jaegerTraceDB) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - trace, err := jtd.dataSource.GetTraceByID(ctx, traceID) - return trace, err -} - -func (jtd *jaegerTraceDB) Ready() bool { - return jtd.dataSource.Ready() -} - -func (jtd *jaegerTraceDB) Close() error { - return jtd.dataSource.Close() -} - -func jaegerGrpcGetTraceByID(ctx context.Context, traceID string, conn *grpc.ClientConn) (traces.Trace, error) { - query := pb.NewQueryServiceClient(conn) - - stream, err := query.GetTrace(ctx, &pb.GetTraceRequest{ - TraceId: traceID, - }) - if err != nil { - return traces.Trace{}, fmt.Errorf("jaeger get trace: %w", err) - } - - // jaeger-query v3 API returns otel spans - var spans []*v1.ResourceSpans - for { - in, err := stream.Recv() - if err == io.EOF { - break - } - - if err != nil { - st, ok := status.FromError(err) - if !ok { - return traces.Trace{}, fmt.Errorf("jaeger stream recv: %w", err) - } - if st.Message() == "trace not found" { - return traces.Trace{}, connection.ErrTraceNotFound - } - return traces.Trace{}, fmt.Errorf("jaeger stream recv err: %w", err) - } - - spans = append(spans, in.ResourceSpans...) - } - - trace := &v1.TracesData{ - ResourceSpans: spans, - } - - return traces.FromOtel(trace), nil -} diff --git a/agent/workers/datastores/opensearchdb.go b/agent/workers/datastores/opensearchdb.go deleted file mode 100644 index bc4a44625f..0000000000 --- a/agent/workers/datastores/opensearchdb.go +++ /dev/null @@ -1,188 +0,0 @@ -package datastores - -import ( - "context" - "crypto/tls" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "strings" - "time" - - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/traces" - "github.com/opensearch-project/opensearch-go" - "github.com/opensearch-project/opensearch-go/opensearchapi" - "go.opentelemetry.io/otel/trace" -) - -func opensearchDefaultPorts() []string { - return []string{"9200", "9250"} -} - -type opensearchDB struct { - realDataStore - config *datastore.ElasticSearchConfig - client *opensearch.Client -} - -func (db *opensearchDB) Connect(ctx context.Context) error { - return nil -} - -func (db *opensearchDB) Close() error { - // No need to close this db - return nil -} - -func (db *opensearchDB) GetEndpoints() string { - return strings.Join(db.config.Addresses, ", ") -} - -func (db *opensearchDB) TestConnection(ctx context.Context) model.ConnectionResult { - tester := connection.NewTester( - connection.WithPortLintingTest(connection.PortLinter("OpenSearch", opensearchDefaultPorts(), db.config.Addresses...)), - connection.WithConnectivityTest(connection.ConnectivityStep(model.ProtocolHTTP, db.config.Addresses...)), - connection.WithPollingTest(connection.TracePollingTestStep(db)), - connection.WithAuthenticationTest(connection.NewTestStep(func(ctx context.Context) (string, error) { - _, err := db.GetTraceByID(ctx, trace.TraceID{}.String()) - if strings.Contains(strings.ToLower(err.Error()), "unauthorized") { - return "Tracetest tried to execute an OpenSearch API request but it failed due to authentication issues", err - } - - return "Tracetest managed to authenticate with OpenSearch", nil - })), - ) - - return tester.TestConnection(ctx) -} - -func (db *opensearchDB) Ready() bool { - return db.client != nil -} - -func (db *opensearchDB) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - if !db.Ready() { - return traces.Trace{}, fmt.Errorf("OpenSearch dataStore not ready") - } - content := strings.NewReader(fmt.Sprintf(`{ - "query": { "match": { "traceId": "%s" } } - }`, traceID)) - - searchRequest := opensearchapi.SearchRequest{ - Index: []string{db.config.Index}, - Body: content, - } - - response, err := searchRequest.Do(ctx, db.client) - if err != nil { - return traces.Trace{}, fmt.Errorf("could not execute search request: %w", err) - } - - responseBody, err := ioutil.ReadAll(response.Body) - if err != nil { - return traces.Trace{}, fmt.Errorf("could not read response body") - } - - var searchResponse searchResponse - err = json.Unmarshal(responseBody, &searchResponse) - if err != nil { - return traces.Trace{}, fmt.Errorf("could not unmarshal search response into struct: %w", err) - } - - if len(searchResponse.Hits.Results) == 0 { - return traces.Trace{}, connection.ErrTraceNotFound - } - - return convertOpensearchFormatIntoTrace(traceID, searchResponse), nil -} - -func newOpenSearchDB(cfg *datastore.ElasticSearchConfig) (DataStore, error) { - var caCert []byte - if cfg.Certificate != "" { - caCert = []byte(cfg.Certificate) - } - - client, err := opensearch.NewClient(opensearch.Config{ - Addresses: cfg.Addresses, - Username: cfg.Username, - Password: cfg.Password, - CACert: caCert, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: cfg.InsecureSkipVerify, - }, - }, - }) - - if err != nil { - return nil, fmt.Errorf("could not create opensearch client: %w", err) - } - - return &opensearchDB{ - config: cfg, - client: client, - }, nil -} - -func convertOpensearchFormatIntoTrace(traceID string, searchResponse searchResponse) traces.Trace { - spans := make([]traces.Span, 0) - for _, result := range searchResponse.Hits.Results { - span := convertOpensearchSpanIntoSpan(result.Source) - spans = append(spans, span) - } - - return traces.NewTrace(traceID, spans) -} - -func convertOpensearchSpanIntoSpan(input map[string]interface{}) traces.Span { - spanId, _ := trace.SpanIDFromHex(input["spanId"].(string)) - - startTime, _ := time.Parse(time.RFC3339, input["startTime"].(string)) - endTime, _ := time.Parse(time.RFC3339, input["endTime"].(string)) - - attributes := traces.NewAttributes() - - for attrName, attrValue := range input { - if !strings.HasPrefix(attrName, "span.attributes.") && !strings.HasPrefix(attrName, "resource.attributes.") { - // Not an attribute we care about - continue - } - - name := attrName - name = strings.ReplaceAll(name, "span.attributes.", "") - name = strings.ReplaceAll(name, "resource.attributes.", "") - // Opensearch's data-prepper replaces "." with "@". We have to revert it. Example: - // "service.name" becomes "service@name" - name = strings.ReplaceAll(name, "@", ".") - attributes.Set(name, fmt.Sprintf("%v", attrValue)) - } - - attributes.Set(traces.TracetestMetadataFieldKind, input["kind"].(string)) - attributes.Set(traces.TracetestMetadataFieldKind, input["parentSpanId"].(string)) - - return traces.Span{ - ID: spanId, - Name: input["name"].(string), - StartTime: startTime, - EndTime: endTime, - Attributes: attributes, - Parent: nil, - Children: []*traces.Span{}, - } -} - -type searchResponse struct { - Hits searchHits `json:"hits"` -} - -type searchHits struct { - Results []searchResult `json:"hits"` -} - -type searchResult struct { - Source map[string]interface{} `json:"_source"` -} diff --git a/agent/workers/datastores/signalfxdb.go b/agent/workers/datastores/signalfxdb.go deleted file mode 100644 index 5094913654..0000000000 --- a/agent/workers/datastores/signalfxdb.go +++ /dev/null @@ -1,215 +0,0 @@ -package datastores - -import ( - "context" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "strconv" - "strings" - "time" - - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/traces" - "go.opentelemetry.io/otel/trace" -) - -type signalfxDB struct { - realDataStore - - Token string - Realm string - URL string - - httpClient *http.Client -} - -func (db *signalfxDB) getURL() string { - if db.URL != "" { - return db.URL - } - - return fmt.Sprintf("https://api.%s.signalfx.com", db.Realm) -} - -func (tdb *signalfxDB) Connect(ctx context.Context) error { - return nil -} - -func (tdb *signalfxDB) Ready() bool { - return true -} - -func (db *signalfxDB) Close() error { - // Doesn't need to be closed - return nil -} - -func (db *signalfxDB) GetEndpoints() string { - return fmt.Sprintf("%s:%s", db.getURL(), "443") -} - -func (db *signalfxDB) TestConnection(ctx context.Context) model.ConnectionResult { - url := fmt.Sprintf("%s:%s", db.getURL(), "443") - tester := connection.NewTester( - connection.WithConnectivityTest(connection.ConnectivityStep(model.ProtocolHTTP, url)), - connection.WithPollingTest(connection.TracePollingTestStep(db)), - connection.WithAuthenticationTest(connection.NewTestStep(func(ctx context.Context) (string, error) { - _, err := db.GetTraceByID(ctx, trace.TraceID{}.String()) - if strings.Contains(strings.ToLower(err.Error()), "401") { - return "Tracetest tried to execute an signalFX API request but it failed due to authentication issues", err - } - - return "Tracetest managed to authenticate with signalFX", nil - })), - ) - return tester.TestConnection(ctx) -} - -func (db *signalfxDB) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - timestamps, err := db.getSegmentsTimestamps(ctx, traceID) - if err != nil { - return traces.Trace{}, fmt.Errorf("coult not get trace segment timestamps: %w", err) - } - - if len(timestamps) == 0 { - return traces.Trace{}, connection.ErrTraceNotFound - } - - traceSpans := make([]traces.Span, 0) - - for _, timestamp := range timestamps { - segmentSpans, err := db.getSegmentSpans(ctx, traceID, timestamp) - if err != nil { - return traces.Trace{}, fmt.Errorf("could not get segment spans: %w", err) - } - - for _, signalFxSpan := range segmentSpans { - span := convertSignalFXSpan(signalFxSpan) - traceSpans = append(traceSpans, span) - } - } - - if len(traceSpans) == 0 { - return traces.Trace{}, connection.ErrTraceNotFound - } - - return traces.NewTrace(traceID, traceSpans), nil -} - -func (db signalfxDB) getSegmentsTimestamps(ctx context.Context, traceID string) ([]int64, error) { - url := fmt.Sprintf("%s/v2/apm/trace/%s/segments", db.getURL(), traceID) - request, err := http.NewRequest("GET", url, nil) - if err != nil { - return []int64{}, fmt.Errorf("could not create request: %w", err) - } - - request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", db.Token)) - - response, err := db.httpClient.Do(request) - if err != nil { - return []int64{}, fmt.Errorf("could not execute request: %w", err) - } - - if response.StatusCode != http.StatusOK { - return []int64{}, fmt.Errorf("service responded with a non ok status code: %s", strconv.Itoa(response.StatusCode)) - } - - defer response.Body.Close() - bodyContent, err := ioutil.ReadAll(response.Body) - if err != nil { - return []int64{}, fmt.Errorf("could not read response body: %w", err) - } - - timestamps := make([]int64, 0) - - err = json.Unmarshal(bodyContent, ×tamps) - if err != nil { - return []int64{}, fmt.Errorf("could not unmarshal response: %w", err) - } - - return timestamps, nil -} - -func (db signalfxDB) getSegmentSpans(ctx context.Context, traceID string, timestamp int64) ([]signalFXSpan, error) { - url := fmt.Sprintf("%s/v2/apm/trace/%s/%d", db.getURL(), traceID, timestamp) - request, err := http.NewRequest("GET", url, nil) - if err != nil { - return []signalFXSpan{}, fmt.Errorf("could not create request: %w", err) - } - - request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", db.Token)) - - response, err := db.httpClient.Do(request) - if err != nil { - return []signalFXSpan{}, fmt.Errorf("could not execute request: %w", err) - } - - if response.StatusCode != 200 { - return []signalFXSpan{}, nil - } - - defer response.Body.Close() - bodyContent, err := ioutil.ReadAll(response.Body) - if err != nil { - return []signalFXSpan{}, fmt.Errorf("could not read response body: %w", err) - } - - spans := make([]signalFXSpan, 0) - - err = json.Unmarshal(bodyContent, &spans) - if err != nil { - return []signalFXSpan{}, fmt.Errorf("could not unmarshal response: %w", err) - } - - return spans, nil -} - -func convertSignalFXSpan(in signalFXSpan) traces.Span { - attributes := traces.NewAttributes() - for name, value := range in.Tags { - attributes.Set(name, value) - } - - for name, value := range in.ProcessTags { - attributes.Set(name, value) - } - - attributes.Set(traces.TracetestMetadataFieldParentID, in.ParentID) - attributes.Set(traces.TracetestMetadataFieldKind, attributes.Get("span.kind")) - attributes.Delete("span.kind") - - spanID, _ := trace.SpanIDFromHex(in.SpanID) - startTime, _ := time.Parse(time.RFC3339, in.StartTime) - endTime := startTime.Add(time.Duration(in.Duration) * time.Microsecond) - - return traces.Span{ - ID: spanID, - Name: in.Name, - StartTime: startTime, - EndTime: endTime, - Attributes: attributes, - } -} - -func newSignalFXDB(cfg *datastore.SignalFXConfig) (DataStore, error) { - return &signalfxDB{ - Realm: cfg.Realm, - Token: cfg.Token, - httpClient: http.DefaultClient, - }, nil -} - -type signalFXSpan struct { - TraceID string `json:"traceId"` - SpanID string `json:"spanId"` - ParentID string `json:"parentId"` - Name string `json:"operationName"` - StartTime string `json:"startTime"` - Duration int `json:"durationMicros"` - Tags map[string]string `json:"tags"` - ProcessTags map[string]string `json:"processTags"` -} diff --git a/agent/workers/datastores/sumologicdb.go b/agent/workers/datastores/sumologicdb.go deleted file mode 100644 index 9d8e65e3c3..0000000000 --- a/agent/workers/datastores/sumologicdb.go +++ /dev/null @@ -1,315 +0,0 @@ -package datastores - -import ( - "context" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "net/http" - "strings" - "time" - - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/pkg/id" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/traces" - "go.opentelemetry.io/otel/trace" -) - -type sumologicDB struct { - realDataStore - - URL string - AccessID string - AccessKey string -} - -type sumologicSpanSummary struct { - ID string `json:"id"` - Name string `json:"operationName"` - ParentID string `json:"parentId"` - StartedAt string `json:"startedAt"` - Duration int64 `json:"duration"` -} - -type getTraceSpansResponse struct { - Page []sumologicSpanSummary `json:"spanPage"` - TotalCount int `json:"totalCount"` - Next string `json:"next"` -} - -func NewSumoLogicDB(config *datastore.SumoLogicConfig) (DataStore, error) { - if config == nil { - return nil, fmt.Errorf("empty config") - } - - return &sumologicDB{ - URL: config.URL, - AccessID: config.AccessID, - AccessKey: config.AccessKey, - }, nil -} - -// Close implements TraceDB. -func (db *sumologicDB) Close() error { - return nil -} - -// Connect implements TraceDB. -func (db *sumologicDB) Connect(ctx context.Context) error { - return nil -} - -// GetEndpoints implements TraceDB. -func (db *sumologicDB) GetEndpoints() string { - return db.URL -} - -func (db *sumologicDB) TestConnection(ctx context.Context) model.ConnectionResult { - tester := connection.NewTester( - connection.WithConnectivityTest(connection.ConnectivityStep(model.ProtocolHTTP, db.GetEndpoints())), - connection.WithPollingTest(connection.TracePollingTestStep(db)), - connection.WithAuthenticationTest(connection.NewTestStep(func(ctx context.Context) (string, error) { - _, err := db.GetTraceByID(ctx, id.NewRandGenerator().TraceID().String()) - if strings.Contains(err.Error(), "Expected 200, got 401") { - return "Tracetest tried to execute a request but it failed due to authentication issues", err - } - - return "Tracetest managed to authenticate with Sumo Logic", nil - })), - ) - - return tester.TestConnection(ctx) -} - -// GetTraceByID implements TraceDB. -func (db *sumologicDB) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - summaries, err := db.getTraceSpans(ctx, traceID, "") - if err != nil { - return traces.Trace{}, fmt.Errorf("could not get list of spans from trace: %w", err) - } - - spans := db.convertSumoLogicSpanSummariesIntoSpans(summaries) - return traces.NewTrace(traceID, spans), nil -} - -func (db *sumologicDB) getTraceSpans(ctx context.Context, traceID string, token string) ([]sumologicSpanSummary, error) { - spans := make([]sumologicSpanSummary, 0) - response, err := db.getSpansPage(ctx, traceID, "") - if err != nil { - return nil, err - } - - spans = append(spans, response.Page...) - - for response.Next != "" { - response, err = db.getSpansPage(ctx, traceID, response.Next) - if err != nil { - return spans, err - } - - spans = append(spans, response.Page...) - } - - return spans, nil -} - -func (db *sumologicDB) getSpansPage(ctx context.Context, traceID string, token string) (*getTraceSpansResponse, error) { - url := fmt.Sprintf("/api/v1/tracing/traces/%s/spans?limit=100", traceID) - if token != "" { - url = fmt.Sprintf("%s&token=%s", url, token) - } - - req, err := db.newRequest(http.MethodGet, url, nil) - if err != nil { - return nil, err - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, fmt.Errorf("could not execute getTraceRequest: %w", err) - } - - if resp.StatusCode == http.StatusNotFound { - return nil, connection.ErrTraceNotFound - } - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code. Expected 200, got %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("could not read getTraceSpans response body: %w", err) - } - - var getTraceSpansResponse getTraceSpansResponse - err = json.Unmarshal(body, &getTraceSpansResponse) - if err != nil { - return nil, fmt.Errorf("could not unmarshal getTraceSpans response body into struct: %w", err) - } - - return &getTraceSpansResponse, nil -} - -func (db *sumologicDB) newRequest(method string, path string, body io.Reader) (*http.Request, error) { - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s%s", db.URL, path), nil) - if err != nil { - return nil, fmt.Errorf("could not create getTraceRequest: %w", err) - } - - basicAuth := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", db.AccessID, db.AccessKey))) - req.Header.Add("Authorization", fmt.Sprintf("Basic %s", basicAuth)) - - return req, nil -} - -func (db *sumologicDB) convertSumoLogicSpanSummariesIntoSpans(summaries []sumologicSpanSummary) []traces.Span { - spans := make([]traces.Span, 0, len(summaries)) - for _, summary := range summaries { - spanID, _ := trace.SpanIDFromHex(summary.ID) - startTime, _ := time.Parse(time.RFC3339Nano, summary.StartedAt) - endTime := startTime.Add(time.Duration(summary.Duration) * time.Nanosecond) - - spans = append(spans, traces.Span{ - ID: spanID, - Name: summary.Name, - Attributes: traces.NewAttributes(map[string]string{ - traces.TracetestMetadataFieldParentID: summary.ParentID, - }), - StartTime: startTime, - EndTime: endTime, - }) - } - - return spans -} - -// Ready implements TraceDB. -func (db *sumologicDB) Ready() bool { - return true -} - -// AugmentTrace implements TraceAugmenter. -func (db *sumologicDB) AugmentTrace(ctx context.Context, trace *traces.Trace) (*traces.Trace, error) { - if trace == nil { - return nil, nil - } - - spans := make([]traces.Span, 0, len(trace.Flat)) - for id, span := range trace.Flat { - if span.Name == traces.TemporaryRootSpanName || span.Name == traces.TriggerSpanName { - spans = append(spans, *span) - continue - } - - span, err := db.getAugmentedSpan(ctx, trace.ID.String(), id.String()) - if err != nil { - return nil, err - } - - spans = append(spans, *span) - } - - newTrace := traces.NewTrace(trace.ID.String(), spans) - - return &newTrace, nil -} - -type augmentedSpan struct { - ID string `json:"id"` - Name string `json:"operationName"` - ParentID string `json:"parentId"` - StartedAt string `json:"startedAt"` - Duration int64 `json:"duration"` - Attributes map[string]typedValue `json:"fields"` - Events []augmentedSpanEvent `json:"events"` -} - -type typedValue struct { - Type string `json:"type"` - Value string `json:"value"` -} - -type augmentedSpanEvent struct { - Timestamp string `json:"timestamp"` - Name string `json:"name"` - Attributes []eventAttribute `json:"attributes"` -} - -type eventAttribute struct { - Name string `json:"attributeName"` - Value typedValue `json:"attributeValue"` -} - -func (db *sumologicDB) getAugmentedSpan(ctx context.Context, traceID string, spanID string) (*traces.Span, error) { - req, err := db.newRequest(http.MethodGet, fmt.Sprintf("/api/v1/tracing/traces/%s/spans/%s", traceID, spanID), nil) - if err != nil { - return nil, fmt.Errorf("could not create request: %w", err) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, fmt.Errorf("could not execute augmented span: %w", err) - } - - if resp.StatusCode == http.StatusTooManyRequests { - // We exceeded the rate limit, wait a bit and retry - time.Sleep(10 * time.Second) - return db.getAugmentedSpan(ctx, traceID, spanID) - } - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code. Expected 200, got %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("could not read response body: %w", err) - } - - var span augmentedSpan - err = json.Unmarshal(body, &span) - if err != nil { - return nil, fmt.Errorf("could not unmarshal augmented span into struct: %w", err) - } - - id, _ := trace.SpanIDFromHex(span.ID) - startTime, _ := time.Parse(time.RFC3339Nano, span.StartedAt) - endTime := startTime.Add(time.Duration(span.Duration) * time.Nanosecond) - - attributes := map[string]string{ - traces.TracetestMetadataFieldParentID: span.ParentID, - } - for name, typedValue := range span.Attributes { - attributes[name] = typedValue.Value - } - - events := make([]traces.SpanEvent, 0, len(span.Events)) - for _, event := range span.Events { - timestamp, _ := time.Parse(time.RFC3339Nano, event.Timestamp) - eventAttributes := make(map[string]string, len(event.Attributes)) - for _, attribute := range event.Attributes { - eventAttributes[attribute.Name] = attribute.Value.Value - } - - events = append(events, traces.SpanEvent{ - Timestamp: timestamp, - Name: event.Name, - Attributes: traces.NewAttributes(eventAttributes), - }) - } - - return &traces.Span{ - ID: id, - Name: span.Name, - StartTime: startTime, - EndTime: endTime, - Attributes: traces.NewAttributes(attributes), - Events: events, - }, nil -} - -var _ DataStore = &sumologicDB{} diff --git a/agent/workers/datastores/tempodb.go b/agent/workers/datastores/tempodb.go deleted file mode 100644 index 723ef4d89e..0000000000 --- a/agent/workers/datastores/tempodb.go +++ /dev/null @@ -1,166 +0,0 @@ -package datastores - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "strings" - - tempopb "github.com/kubeshop/tracetest/agent/internal/proto-gen-go/tempo-idl" - "github.com/kubeshop/tracetest/server/datastore" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/pkg/id" - "github.com/kubeshop/tracetest/server/tracedb/connection" - "github.com/kubeshop/tracetest/server/tracedb/datasource" - "github.com/kubeshop/tracetest/server/traces" - "github.com/pkg/errors" - "go.opentelemetry.io/otel/trace" - v1 "go.opentelemetry.io/proto/otlp/trace/v1" - "google.golang.org/grpc" - "google.golang.org/grpc/status" -) - -func tempoDefaultPorts() []string { - return []string{"9095", ""} -} - -type tempoTraceDB struct { - realDataStore - dataSource datasource.DataSource -} - -func newTempoDB(config *datastore.MultiChannelClientConfig) (DataStore, error) { - dataSource := datasource.New("Tempo", config, datasource.Callbacks{ - HTTP: httpGetTraceByID, - GRPC: grpcGetTraceByID, - }) - - return &tempoTraceDB{ - dataSource: dataSource, - }, nil -} - -func (tdb *tempoTraceDB) Connect(ctx context.Context) error { - return tdb.dataSource.Connect(ctx) -} - -func (ttd *tempoTraceDB) GetEndpoints() string { - return ttd.dataSource.Endpoint() -} - -func (ttd *tempoTraceDB) TestConnection(ctx context.Context) model.ConnectionResult { - tester := connection.NewTester( - connection.WithPortLintingTest(connection.PortLinter("Tempo", tempoDefaultPorts(), ttd.dataSource.Endpoint())), - connection.WithConnectivityTest(ttd.dataSource), - connection.WithPollingTest(connection.TracePollingTestStep(ttd)), - connection.WithAuthenticationTest(connection.NewTestStep(func(ctx context.Context) (string, error) { - _, err := ttd.GetTraceByID(ctx, id.NewRandGenerator().TraceID().String()) - if strings.Contains(err.Error(), "authentication handshake failed") { - return "Tracetest tried to execute a request but it failed due to authentication issues", err - } - - return "Tracetest managed to authenticate with Tempo", nil - })), - ) - - return tester.TestConnection(ctx) -} - -func (ttd *tempoTraceDB) Ready() bool { - return ttd.dataSource.Ready() -} - -func (ttd *tempoTraceDB) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) { - trace, err := ttd.dataSource.GetTraceByID(ctx, traceID) - return trace, err -} - -func (ttd *tempoTraceDB) Close() error { - return ttd.dataSource.Close() -} - -func grpcGetTraceByID(ctx context.Context, traceID string, conn *grpc.ClientConn) (traces.Trace, error) { - query := tempopb.NewQuerierClient(conn) - - trID, err := trace.TraceIDFromHex(traceID) - if err != nil { - return traces.Trace{}, err - } - - resp, err := query.FindTraceByID(ctx, &tempopb.TraceByIDRequest{ - TraceID: []byte(trID[:]), - }) - if err != nil { - return traces.Trace{}, handleError(err) - } - - if resp.Trace == nil { - return traces.Trace{}, connection.ErrTraceNotFound - } - - if len(resp.Trace.Batches) == 0 { - return traces.Trace{}, connection.ErrTraceNotFound - } - - trace := &v1.TracesData{ - ResourceSpans: resp.GetTrace().GetBatches(), - } - - return traces.FromOtel(trace), nil -} - -type HttpTempoTraceByIDResponse struct { - Batches []*traces.HttpResourceSpans `json:"batches"` -} - -func httpGetTraceByID(ctx context.Context, traceID string, client *datasource.HttpClient) (traces.Trace, error) { - trID, err := trace.TraceIDFromHex(traceID) - if err != nil { - return traces.Trace{}, err - } - resp, err := client.Request(ctx, fmt.Sprintf("/api/traces/%s", trID), http.MethodGet, "") - - if err != nil { - return traces.Trace{}, handleError(err) - } - - if resp.StatusCode == 404 { - return traces.Trace{}, connection.ErrTraceNotFound - } - - var body []byte - if b, err := io.ReadAll(resp.Body); err == nil { - body = b - } else { - fmt.Println(err) - } - - if resp.StatusCode == 401 { - return traces.Trace{}, fmt.Errorf("tempo err: %w %s", errors.New("authentication handshake failed"), string(body)) - } - - var trace HttpTempoTraceByIDResponse - err = json.Unmarshal(body, &trace) - if err != nil { - return traces.Trace{}, err - } - - return traces.FromHttpOtelResourceSpans(trace.Batches), nil -} - -func handleError(err error) error { - if err != nil { - st, ok := status.FromError(err) - if !ok { - return fmt.Errorf("tempo FindTraceByID %w", err) - } - if st.Message() == "trace not found" { - return connection.ErrTraceNotFound - } - return fmt.Errorf("tempo err: %w", err) - } - - return nil -}