Skip to content

Commit

Permalink
added hard limit on telemetry queue to reduce mem usage
Browse files Browse the repository at this point in the history
  • Loading branch information
xadhatter committed May 23, 2024
1 parent 517249a commit 9aede89
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 72 deletions.
2 changes: 1 addition & 1 deletion components/broker/engine/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (srv *GRPCServer) Subscribe(stream grpc.Broker_SubscribeServer) error {
case status.Code() == codes.Canceled:
l.Debug("context canceled")
case status.Code() == codes.PermissionDenied:
l.Warn(err)
l.Debug(err)
default:
l.Error(err)
}
Expand Down
76 changes: 40 additions & 36 deletions components/broker/telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/xigxog/kubefox/core"
"github.com/xigxog/kubefox/logkf"
"github.com/xigxog/kubefox/telemetry"
"github.com/xigxog/kubefox/utils"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
Expand All @@ -31,6 +30,10 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

const (
maxQueueSize = 1_000
)

var (
Tracer = otel.Tracer("")
)
Expand Down Expand Up @@ -132,6 +135,33 @@ func (cl *Client) AddSpans(comp *core.Component, spans ...*telemetry.Span) {
}

func (cl *Client) AddProtoSpans(comp *core.Component, spans []*tracev1.Span) {
cl.mutex.Lock()
defer cl.mutex.Unlock()

switch {
case len(spans) == 0:
return
case len(cl.spans) > maxQueueSize:
cl.log.Warnf("maximum number of queued spans exceeded, discarding %d incoming", len(spans))
return
}

i := 0
for _, s := range spans {
if s.TraceState == "kf=1" {
spans[i] = s
i++
}
}
if diff := len(spans) - i; diff > 0 {
cl.log.Debugf("%d spans do not have record state set, discarding", diff)
}
// Truncate spans that do not need to be recorded.
spans = spans[:i]
if len(spans) == 0 {
return
}

resSpans := &tracev1.ResourceSpans{
Resource: &resv1.Resource{
Attributes: []*commonv1.KeyValue{
Expand All @@ -154,13 +184,18 @@ func (cl *Client) AddProtoSpans(comp *core.Component, spans []*tracev1.Span) {
SchemaUrl: semconv.SchemaURL,
}

cl.mutex.Lock()
cl.spans = append(cl.spans, resSpans)
cl.mutex.Unlock()
}

func (cl *Client) AddProtoLogs(comp *core.Component, logRecords []*logsv1.LogRecord) {
if len(logRecords) == 0 {
cl.mutex.Lock()
defer cl.mutex.Unlock()

switch {
case len(logRecords) == 0:
return
case len(cl.logs) > maxQueueSize:
cl.log.Warnf("maximum number of queued log records exceeded, discarding %d incoming", len(logRecords))
return
}

Expand All @@ -175,9 +210,7 @@ func (cl *Client) AddProtoLogs(comp *core.Component, logRecords []*logsv1.LogRec
SchemaUrl: semconv.SchemaURL,
}

cl.mutex.Lock()
cl.logs = append(cl.logs, resSpans)
cl.mutex.Unlock()
}

// TODO have broker/grpc server create resource and pass that instead of comp so
Expand Down Expand Up @@ -225,22 +258,6 @@ func (cl *Client) publishSpans(ctx context.Context) {
cl.mutex.Lock()
defer cl.mutex.Unlock()

i := 0
for _, resSpan := range cl.spans {
if shouldSample(resSpan) {
cl.spans[i] = resSpan
i++
}
}
if diff := len(cl.spans) - i; diff > 0 {
cl.log.Debugf("%d resource spans do not have sample flag set, discarding", diff)
}
// Truncate spans that do not need to be sampled.
cl.spans = cl.spans[:i]
if len(cl.spans) == 0 {
return
}

cl.log.Debugf("uploading %d resource spans", len(cl.spans))
if err := cl.traceClient.UploadTraces(ctx, cl.spans); err != nil {
cl.log.Errorf("error uploading traces: %v", err)
Expand All @@ -254,26 +271,13 @@ func (cl *Client) publishLogs(ctx context.Context) {
}

cl.mutex.Lock()
defer cl.mutex.Unlock()

cl.log.Debugf("uploading %d resource logs", len(cl.logs))
if err := cl.logsClient.UploadLogs(ctx, cl.logs); err != nil {
cl.log.Errorf("error uploading logs: %v", err)
}
cl.logs = nil

cl.mutex.Unlock()
}

func shouldSample(resSpan *tracev1.ResourceSpans) bool {
for _, scopeSpan := range resSpan.ScopeSpans {
for _, s := range scopeSpan.Spans {
if utils.HasBit(s.Flags, 0) {
return true
}
}
}

return false
}

// func (cl *Client) tls() (*tls.Config, error) {
Expand Down
24 changes: 15 additions & 9 deletions components/httpsrv/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -150,11 +151,21 @@ func (srv *Server) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Reques
}
}

rootSpan = telemetry.StartSpan(fmt.Sprintf("%s %s", httpReq.Method, httpReq.URL), parentTrace,
name := fmt.Sprintf("%s %s", httpReq.Method, httpReq.URL)
rootSpan = telemetry.StartSpan(name, parentTrace,
telemetry.Attr(telemetry.AttrKeyHTTPReqMethod, httpReq.Method),
telemetry.Attr(telemetry.AttrKeyHTTPReqBodySize, httpReq.ContentLength),
)

if parentTrace != nil && utils.HasBit(parentTrace.Flags, 0) {
rootSpan.SetRecord(true)
}

sample := core.GetParamOrHeader(httpReq, api.HeaderTelemetrySample, api.HeaderTelemetrySampleAbbrv)
if strings.EqualFold(sample, "true") || sample == "1" {
rootSpan.SetRecord(true)
}

req = core.NewReq(core.EventOpts{
Source: srv.brk.Component,
Timeout: EventTimeout,
Expand All @@ -163,15 +174,8 @@ func (srv *Server) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Reques
setHeader(resWriter, api.HeaderEventId, req.Id)

defer func() {
if req.ParentSpan.Sample() {
rootSpan.Flags = utils.SetBit(rootSpan.Flags, 0)
}

if resp != nil {
req.SetContext(resp.Context)
if resp.ParentSpan.Sample() {
rootSpan.Flags = utils.SetBit(rootSpan.Flags, 0)
}
}

rootSpan.SetEventAttributes(req)
Expand All @@ -181,7 +185,9 @@ func (srv *Server) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Reques

log.Debug("sending spans to broker")

go srv.brk.SendTelemetry(rootSpan.Flatten(), nil)
if rootSpan.Record() {
srv.brk.SendTelemetry(rootSpan.Flatten(), nil)
}
}()

// TODO add standard http attributes
Expand Down
2 changes: 1 addition & 1 deletion components/operator/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var (
"cpu": resource.MustParse("100m"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("160Mi"),
"memory": resource.MustParse("256Mi"),
"cpu": resource.MustParse("2"),
},
},
Expand Down
10 changes: 1 addition & 9 deletions core/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func applyOpts(evt *Event, cat Category, opts EventOpts) *Event {
return evt
}

func (s *SpanContext) Sample() bool {
func (s *SpanContext) Sampled() bool {
if s == nil {
return false
}
Expand Down Expand Up @@ -611,14 +611,6 @@ func (evt *Event) SetHTTPRequest(httpReq *http.Request, maxEventSize int64) erro
}
}

sample := GetParamOrHeader(httpReq, api.HeaderTelemetrySample, api.HeaderTelemetrySampleAbbrv)
if strings.EqualFold(sample, "true") || sample == "1" {
if evt.ParentSpan == nil {
evt.ParentSpan = &SpanContext{}
}
evt.ParentSpan.Flags = utils.SetBit(evt.ParentSpan.Flags, 0)
}

DelParamOrHeader(httpReq,
api.HeaderVirtualEnv, api.HeaderVirtualEnvAbbrv,
api.HeaderAppDeployment, api.HeaderAppDeploymentAbbrv,
Expand Down
14 changes: 8 additions & 6 deletions kit/kit.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,14 @@ func (svc *kit) recvReq(req *grpc.ComponentEvent) {

ktx.rootSpan.End(err)

spans := ktx.rootSpan.Flatten()
if ktx.rootSpan.Record() {
spans := ktx.rootSpan.Flatten()

var logs []*logsv1.LogRecord
for _, s := range spans {
logs = append(logs, s.LogRecords...)
}
var logs []*logsv1.LogRecord
for _, s := range spans {
logs = append(logs, s.LogRecords...)
}

go svc.brk.SendTelemetry(spans, logs)
svc.brk.SendTelemetry(spans, logs)
}
}
42 changes: 32 additions & 10 deletions telemetry/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"github.com/xigxog/kubefox/core"
"github.com/xigxog/kubefox/utils"
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
logsv1 "go.opentelemetry.io/proto/otlp/logs/v1"
resv1 "go.opentelemetry.io/proto/otlp/resource/v1"
Expand Down Expand Up @@ -166,14 +165,6 @@ func (s *Span) SpanContext() *core.SpanContext {
}
}

func (s *Span) ShouldSample() bool {
if s == nil {
return false
}

return utils.HasBit(s.Flags, 0)
}

func (s *Span) SetAttributes(attrs ...Attribute) {
if s == nil {
return
Expand Down Expand Up @@ -240,6 +231,32 @@ func (s *Span) SetEventAttributes(evt *core.Event) {
}
}

func (s *Span) Record() bool {
if s.TraceState == "kf=1" {
return true
}

for _, c := range s.ChildSpans {
if c.Record() {
return true
}
}

return false
}

func (s *Span) SetRecord(record bool) {
if record {
s.TraceState = "kf=1"
} else {
s.TraceState = ""
}

for _, child := range s.ChildSpans {
child.SetRecord(record)
}
}

func (s *Span) Info(msg string) {
s.log(logsv1.SeverityNumber_SEVERITY_NUMBER_INFO, "info", msg)
}
Expand Down Expand Up @@ -273,7 +290,7 @@ func (s *Span) RecordErr(err error) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.Flags = utils.SetBit(s.Flags, 0)
s.SetRecord(true)
s.Events = append(s.Events, &tracev1.Span_Event{
TimeUnixNano: now(),
Name: EventNameException,
Expand Down Expand Up @@ -311,8 +328,13 @@ func (s *Span) End(errs ...error) {

func (s *Span) Flatten() []*Span {
flat := []*Span{s}
record := s.Record()
for _, c := range s.ChildSpans {
flat = append(flat, c.Flatten()...)
record = record || c.Record()
}
if record {
s.SetRecord(true)
}

return flat
Expand Down

0 comments on commit 9aede89

Please sign in to comment.