diff --git a/components/broker/engine/grpc_server.go b/components/broker/engine/grpc_server.go index 7207655..775e802 100644 --- a/components/broker/engine/grpc_server.go +++ b/components/broker/engine/grpc_server.go @@ -127,7 +127,7 @@ func (srv *GRPCServer) Subscribe(stream grpc.Broker_SubscribeServer) error { case status.Code() == codes.Canceled: l.Debug("context canceled") case status.Code() == codes.PermissionDenied: - l.Warn(err) + l.Debug(err) default: l.Error(err) } diff --git a/components/broker/telemetry/client.go b/components/broker/telemetry/client.go index 44dc024..d705525 100644 --- a/components/broker/telemetry/client.go +++ b/components/broker/telemetry/client.go @@ -17,7 +17,6 @@ import ( "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/logkf" "github.com/xigxog/kubefox/telemetry" - "github.com/xigxog/kubefox/utils" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -31,6 +30,10 @@ import ( "google.golang.org/grpc/credentials/insecure" ) +const ( + maxQueueSize = 1_000 +) + var ( Tracer = otel.Tracer("") ) @@ -132,6 +135,33 @@ func (cl *Client) AddSpans(comp *core.Component, spans ...*telemetry.Span) { } func (cl *Client) AddProtoSpans(comp *core.Component, spans []*tracev1.Span) { + cl.mutex.Lock() + defer cl.mutex.Unlock() + + switch { + case len(spans) == 0: + return + case len(cl.spans) > maxQueueSize: + cl.log.Warnf("maximum number of queued spans exceeded, discarding %d incoming", len(spans)) + return + } + + i := 0 + for _, s := range spans { + if s.TraceState == "kf=1" { + spans[i] = s + i++ + } + } + if diff := len(spans) - i; diff > 0 { + cl.log.Debugf("%d spans do not have record state set, discarding", diff) + } + // Truncate spans that do not need to be recorded. + spans = spans[:i] + if len(spans) == 0 { + return + } + resSpans := &tracev1.ResourceSpans{ Resource: &resv1.Resource{ Attributes: []*commonv1.KeyValue{ @@ -154,13 +184,18 @@ func (cl *Client) AddProtoSpans(comp *core.Component, spans []*tracev1.Span) { SchemaUrl: semconv.SchemaURL, } - cl.mutex.Lock() cl.spans = append(cl.spans, resSpans) - cl.mutex.Unlock() } func (cl *Client) AddProtoLogs(comp *core.Component, logRecords []*logsv1.LogRecord) { - if len(logRecords) == 0 { + cl.mutex.Lock() + defer cl.mutex.Unlock() + + switch { + case len(logRecords) == 0: + return + case len(cl.logs) > maxQueueSize: + cl.log.Warnf("maximum number of queued log records exceeded, discarding %d incoming", len(logRecords)) return } @@ -175,9 +210,7 @@ func (cl *Client) AddProtoLogs(comp *core.Component, logRecords []*logsv1.LogRec SchemaUrl: semconv.SchemaURL, } - cl.mutex.Lock() cl.logs = append(cl.logs, resSpans) - cl.mutex.Unlock() } // TODO have broker/grpc server create resource and pass that instead of comp so @@ -225,22 +258,6 @@ func (cl *Client) publishSpans(ctx context.Context) { cl.mutex.Lock() defer cl.mutex.Unlock() - i := 0 - for _, resSpan := range cl.spans { - if shouldSample(resSpan) { - cl.spans[i] = resSpan - i++ - } - } - if diff := len(cl.spans) - i; diff > 0 { - cl.log.Debugf("%d resource spans do not have sample flag set, discarding", diff) - } - // Truncate spans that do not need to be sampled. - cl.spans = cl.spans[:i] - if len(cl.spans) == 0 { - return - } - cl.log.Debugf("uploading %d resource spans", len(cl.spans)) if err := cl.traceClient.UploadTraces(ctx, cl.spans); err != nil { cl.log.Errorf("error uploading traces: %v", err) @@ -254,26 +271,13 @@ func (cl *Client) publishLogs(ctx context.Context) { } cl.mutex.Lock() + defer cl.mutex.Unlock() cl.log.Debugf("uploading %d resource logs", len(cl.logs)) if err := cl.logsClient.UploadLogs(ctx, cl.logs); err != nil { cl.log.Errorf("error uploading logs: %v", err) } cl.logs = nil - - cl.mutex.Unlock() -} - -func shouldSample(resSpan *tracev1.ResourceSpans) bool { - for _, scopeSpan := range resSpan.ScopeSpans { - for _, s := range scopeSpan.Spans { - if utils.HasBit(s.Flags, 0) { - return true - } - } - } - - return false } // func (cl *Client) tls() (*tls.Config, error) { diff --git a/components/httpsrv/server/server.go b/components/httpsrv/server/server.go index 2410585..95c80bc 100644 --- a/components/httpsrv/server/server.go +++ b/components/httpsrv/server/server.go @@ -16,6 +16,7 @@ import ( "net" "net/http" "strconv" + "strings" "sync" "time" @@ -150,11 +151,21 @@ func (srv *Server) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Reques } } - rootSpan = telemetry.StartSpan(fmt.Sprintf("%s %s", httpReq.Method, httpReq.URL), parentTrace, + name := fmt.Sprintf("%s %s", httpReq.Method, httpReq.URL) + rootSpan = telemetry.StartSpan(name, parentTrace, telemetry.Attr(telemetry.AttrKeyHTTPReqMethod, httpReq.Method), telemetry.Attr(telemetry.AttrKeyHTTPReqBodySize, httpReq.ContentLength), ) + if parentTrace != nil && utils.HasBit(parentTrace.Flags, 0) { + rootSpan.SetRecord(true) + } + + sample := core.GetParamOrHeader(httpReq, api.HeaderTelemetrySample, api.HeaderTelemetrySampleAbbrv) + if strings.EqualFold(sample, "true") || sample == "1" { + rootSpan.SetRecord(true) + } + req = core.NewReq(core.EventOpts{ Source: srv.brk.Component, Timeout: EventTimeout, @@ -163,15 +174,8 @@ func (srv *Server) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Reques setHeader(resWriter, api.HeaderEventId, req.Id) defer func() { - if req.ParentSpan.Sample() { - rootSpan.Flags = utils.SetBit(rootSpan.Flags, 0) - } - if resp != nil { req.SetContext(resp.Context) - if resp.ParentSpan.Sample() { - rootSpan.Flags = utils.SetBit(rootSpan.Flags, 0) - } } rootSpan.SetEventAttributes(req) @@ -181,7 +185,9 @@ func (srv *Server) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Reques log.Debug("sending spans to broker") - go srv.brk.SendTelemetry(rootSpan.Flatten(), nil) + if rootSpan.Record() { + srv.brk.SendTelemetry(rootSpan.Flatten(), nil) + } }() // TODO add standard http attributes diff --git a/components/operator/defaults/defaults.go b/components/operator/defaults/defaults.go index 1e16f0c..a9973ff 100644 --- a/components/operator/defaults/defaults.go +++ b/components/operator/defaults/defaults.go @@ -68,7 +68,7 @@ var ( "cpu": resource.MustParse("100m"), }, Limits: v1.ResourceList{ - "memory": resource.MustParse("160Mi"), + "memory": resource.MustParse("256Mi"), "cpu": resource.MustParse("2"), }, }, diff --git a/core/event.go b/core/event.go index 47f85ae..6df331d 100644 --- a/core/event.go +++ b/core/event.go @@ -116,7 +116,7 @@ func applyOpts(evt *Event, cat Category, opts EventOpts) *Event { return evt } -func (s *SpanContext) Sample() bool { +func (s *SpanContext) Sampled() bool { if s == nil { return false } @@ -611,14 +611,6 @@ func (evt *Event) SetHTTPRequest(httpReq *http.Request, maxEventSize int64) erro } } - sample := GetParamOrHeader(httpReq, api.HeaderTelemetrySample, api.HeaderTelemetrySampleAbbrv) - if strings.EqualFold(sample, "true") || sample == "1" { - if evt.ParentSpan == nil { - evt.ParentSpan = &SpanContext{} - } - evt.ParentSpan.Flags = utils.SetBit(evt.ParentSpan.Flags, 0) - } - DelParamOrHeader(httpReq, api.HeaderVirtualEnv, api.HeaderVirtualEnvAbbrv, api.HeaderAppDeployment, api.HeaderAppDeploymentAbbrv, diff --git a/kit/kit.go b/kit/kit.go index ad5d4ab..f953e5b 100644 --- a/kit/kit.go +++ b/kit/kit.go @@ -330,12 +330,14 @@ func (svc *kit) recvReq(req *grpc.ComponentEvent) { ktx.rootSpan.End(err) - spans := ktx.rootSpan.Flatten() + if ktx.rootSpan.Record() { + spans := ktx.rootSpan.Flatten() - var logs []*logsv1.LogRecord - for _, s := range spans { - logs = append(logs, s.LogRecords...) - } + var logs []*logsv1.LogRecord + for _, s := range spans { + logs = append(logs, s.LogRecords...) + } - go svc.brk.SendTelemetry(spans, logs) + svc.brk.SendTelemetry(spans, logs) + } } diff --git a/telemetry/trace.go b/telemetry/trace.go index a333fec..5aa7a45 100644 --- a/telemetry/trace.go +++ b/telemetry/trace.go @@ -18,7 +18,6 @@ import ( "time" "github.com/xigxog/kubefox/core" - "github.com/xigxog/kubefox/utils" commonv1 "go.opentelemetry.io/proto/otlp/common/v1" logsv1 "go.opentelemetry.io/proto/otlp/logs/v1" resv1 "go.opentelemetry.io/proto/otlp/resource/v1" @@ -166,14 +165,6 @@ func (s *Span) SpanContext() *core.SpanContext { } } -func (s *Span) ShouldSample() bool { - if s == nil { - return false - } - - return utils.HasBit(s.Flags, 0) -} - func (s *Span) SetAttributes(attrs ...Attribute) { if s == nil { return @@ -240,6 +231,32 @@ func (s *Span) SetEventAttributes(evt *core.Event) { } } +func (s *Span) Record() bool { + if s.TraceState == "kf=1" { + return true + } + + for _, c := range s.ChildSpans { + if c.Record() { + return true + } + } + + return false +} + +func (s *Span) SetRecord(record bool) { + if record { + s.TraceState = "kf=1" + } else { + s.TraceState = "" + } + + for _, child := range s.ChildSpans { + child.SetRecord(record) + } +} + func (s *Span) Info(msg string) { s.log(logsv1.SeverityNumber_SEVERITY_NUMBER_INFO, "info", msg) } @@ -273,7 +290,7 @@ func (s *Span) RecordErr(err error) { s.mutex.Lock() defer s.mutex.Unlock() - s.Flags = utils.SetBit(s.Flags, 0) + s.SetRecord(true) s.Events = append(s.Events, &tracev1.Span_Event{ TimeUnixNano: now(), Name: EventNameException, @@ -311,8 +328,13 @@ func (s *Span) End(errs ...error) { func (s *Span) Flatten() []*Span { flat := []*Span{s} + record := s.Record() for _, c := range s.ChildSpans { flat = append(flat, c.Flatten()...) + record = record || c.Record() + } + if record { + s.SetRecord(true) } return flat