From b5cb94e540ec57a5af59edc17821e297e7260f09 Mon Sep 17 00:00:00 2001 From: Sebastian Choren Date: Wed, 20 Sep 2023 16:24:49 -0300 Subject: [PATCH] fix(server): improve handling of unix time attributes (#3170) --- .gitignore | 2 + agent/workers/poller.go | 4 +- .../trigger_result_processor_worker.go | 2 + server/http/mappings/traces.go | 17 +++++-- server/pkg/timing/timing.go | 37 ++++++++++----- server/test/run.go | 47 ++++--------------- server/test/run_test.go | 45 +++++++++--------- server/traces/span_entitiess.go | 17 +++---- server/traces/trace_entities.go | 3 +- 9 files changed, 89 insertions(+), 85 deletions(-) diff --git a/.gitignore b/.gitignore index e838e34e43..a4c489c955 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,5 @@ server/html # autogenerated with `cd cli; make docgen` docs/docs/cli/reference/ + +__debug* diff --git a/agent/workers/poller.go b/agent/workers/poller.go index d3b328fe19..d77c0bf723 100644 --- a/agent/workers/poller.go +++ b/agent/workers/poller.go @@ -168,8 +168,8 @@ func convertTraceInToProtoSpans(trace traces.Trace) []*proto.Span { ParentId: getParentID(span), Name: span.Name, Kind: string(span.Kind), - StartTime: span.StartTime.UnixMicro(), - EndTime: span.EndTime.UnixMicro(), + StartTime: span.StartTime.UnixNano(), + EndTime: span.EndTime.UnixNano(), Attributes: attributes, } spans = append(spans, &protoSpan) diff --git a/server/executor/trigger_result_processor_worker.go b/server/executor/trigger_result_processor_worker.go index 02c2c6cdee..eb7d4a2bd7 100644 --- a/server/executor/trigger_result_processor_worker.go +++ b/server/executor/trigger_result_processor_worker.go @@ -92,6 +92,7 @@ func (r triggerResultProcessorWorker) ProcessItem(ctx context.Context, job Job) } job.Run.State = test.RunStateAwaitingTrace + r.handleDBError(job.Run, r.updater.Update(ctx, job.Run)) r.outputQueue.Enqueue(ctx, job) @@ -120,6 +121,7 @@ func (r triggerResultProcessorWorker) emitMismatchEndpointEvent(ctx context.Cont } func (r triggerResultProcessorWorker) handleExecutionResult(run test.Run) test.Run { + run = run.TriggerCompleted(run.TriggerResult) if run.TriggerResult.Error != nil { run = run.TriggerFailed(fmt.Errorf(run.TriggerResult.Error.ErrorMessage)) diff --git a/server/http/mappings/traces.go b/server/http/mappings/traces.go index e08c707317..417f1a426f 100644 --- a/server/http/mappings/traces.go +++ b/server/http/mappings/traces.go @@ -2,9 +2,9 @@ package mappings import ( "strconv" - "time" "github.com/kubeshop/tracetest/server/openapi" + "github.com/kubeshop/tracetest/server/pkg/timing" "github.com/kubeshop/tracetest/server/traces" "go.opentelemetry.io/otel/trace" ) @@ -81,21 +81,30 @@ func (m Model) Trace(in openapi.Trace) *traces.Trace { flat[sid] = &span } + tree := m.Span(in.Tree, nil) + if spanIsEmpty(tree) && len(flat) == 0 { + return nil + } + return &traces.Trace{ ID: tid, - RootSpan: m.Span(in.Tree, nil), + RootSpan: tree, Flat: flat, } } +func spanIsEmpty(span traces.Span) bool { + return span.ID == trace.SpanID{} && len(span.Attributes) == 0 && span.Name == "" && len(span.Children) == 0 +} + func (m Model) Span(in openapi.Span, parent *traces.Span) traces.Span { sid, _ := trace.SpanIDFromHex(in.Id) span := traces.Span{ ID: sid, Attributes: in.Attributes, Name: in.Name, - StartTime: time.UnixMilli(int64(in.StartTime)), - EndTime: time.UnixMilli(int64(in.EndTime)), + StartTime: timing.ParseUnix(int64(in.StartTime)), + EndTime: timing.ParseUnix(int64(in.EndTime)), Parent: parent, } span.Children = m.Spans(in.Children, &span) diff --git a/server/pkg/timing/timing.go b/server/pkg/timing/timing.go index dfb181c7f5..768afd06f8 100644 --- a/server/pkg/timing/timing.go +++ b/server/pkg/timing/timing.go @@ -1,7 +1,7 @@ package timing import ( - "math" + "fmt" "time" ) @@ -12,25 +12,40 @@ var Now = func() time.Time { func TimeDiff(start, end time.Time) time.Duration { var endDate time.Time if !dateIsZero(end) { - endDate = end + endDate = end.UTC() } else { endDate = Now() } return endDate.Sub(start) } -func DurationInMillieconds(d time.Duration) int { - return int(d.Milliseconds()) +func dateIsZero(in time.Time) bool { + return in.IsZero() || in.Unix() == 0 } -func DurationInNanoseconds(d time.Duration) int { - return int(d.Nanoseconds()) -} +// ParseUnix parses a unix timestamp into a time.Time +// it accepts an integer which can be either milli or nano +func ParseUnix(timestamp int64) time.Time { + // Determine the number of digits in the timestamp + numDigits := len(fmt.Sprintf("%d", timestamp)) -func DurationInSeconds(d time.Duration) int { - return int(math.Ceil(d.Seconds())) + switch { + case numDigits <= 10: + // Timestamp is in seconds + return time.Unix(timestamp, 0) + case numDigits <= 13: + // Timestamp is in milliseconds, convert to nanoseconds and then to time.Time + return time.Unix(0, timestamp*int64(time.Millisecond)) + default: + // Timestamp is in nanoseconds, convert directly to time.Time + return time.Unix(0, timestamp) + } } -func dateIsZero(in time.Time) bool { - return in.IsZero() || in.Unix() == 0 +func MustParse(in string) time.Time { + date, err := time.Parse(time.RFC3339Nano, in) + if err != nil { + panic(fmt.Sprintf("error parsing date: %s", err)) + } + return date } diff --git a/server/test/run.go b/server/test/run.go index 2b0d4f0181..c6197fca77 100644 --- a/server/test/run.go +++ b/server/test/run.go @@ -9,16 +9,13 @@ import ( "github.com/kubeshop/tracetest/server/linter/analyzer" "github.com/kubeshop/tracetest/server/pkg/id" "github.com/kubeshop/tracetest/server/pkg/maps" + "github.com/kubeshop/tracetest/server/pkg/timing" "github.com/kubeshop/tracetest/server/test/trigger" "github.com/kubeshop/tracetest/server/traces" "github.com/kubeshop/tracetest/server/variableset" ) var ( - Now = func() time.Time { - return time.Now().UTC() - } - IDGen = id.NewRandGenerator() ) @@ -28,7 +25,7 @@ func NewRun() Run { TraceID: IDGen.TraceID(), SpanID: IDGen.SpanID(), State: RunStateCreated, - CreatedAt: Now(), + CreatedAt: timing.Now(), } } @@ -43,53 +40,29 @@ func (r Run) TransactionStepResourceID() string { func (r Run) Copy() Run { r.ID = 0 r.Results = nil - r.CreatedAt = Now() + r.CreatedAt = timing.Now() return r } func (r Run) ExecutionTime() int { - return durationInSeconds( - timeDiff(r.CreatedAt, r.CompletedAt), - ) + diff := timing.TimeDiff(r.CreatedAt, r.CompletedAt) + return int(math.Ceil(diff.Seconds())) } func (r Run) TriggerTime() int { - return durationInMillieconds( - timeDiff(r.ServiceTriggeredAt, r.ServiceTriggerCompletedAt), - ) -} - -func timeDiff(start, end time.Time) time.Duration { - var endDate time.Time - if !dateIsZero(end) { - endDate = end - } else { - endDate = Now() - } - return endDate.Sub(start) -} - -func durationInMillieconds(d time.Duration) int { - return int(d.Milliseconds()) -} - -func durationInSeconds(d time.Duration) int { - return int(math.Ceil(d.Seconds())) -} - -func dateIsZero(in time.Time) bool { - return in.IsZero() || in.Unix() == 0 + diff := timing.TimeDiff(r.ServiceTriggeredAt, r.ServiceTriggerCompletedAt) + return int(diff.Milliseconds()) } func (r Run) Start() Run { r.State = RunStateExecuting - r.ServiceTriggeredAt = Now() + r.ServiceTriggeredAt = timing.Now() return r } func (r Run) TriggerCompleted(tr trigger.TriggerResult) Run { - r.ServiceTriggerCompletedAt = Now() + r.ServiceTriggerCompletedAt = timing.Now() r.TriggerResult = tr return r } @@ -128,7 +101,7 @@ func (r Run) SuccessfullyAsserted( } func (r Run) Finish() Run { - r.CompletedAt = Now() + r.CompletedAt = timing.Now() return r } diff --git a/server/test/run_test.go b/server/test/run_test.go index 258b85e8ac..c2f7c5b0a4 100644 --- a/server/test/run_test.go +++ b/server/test/run_test.go @@ -8,6 +8,7 @@ import ( "github.com/kubeshop/tracetest/server/executor/testrunner" "github.com/kubeshop/tracetest/server/linter/analyzer" "github.com/kubeshop/tracetest/server/pkg/maps" + "github.com/kubeshop/tracetest/server/pkg/timing" "github.com/kubeshop/tracetest/server/test" "github.com/kubeshop/tracetest/server/variableset" "github.com/stretchr/testify/assert" @@ -23,49 +24,49 @@ func TestRunExecutionTime(t *testing.T) { { name: "CompletedOk", run: test.Run{ - CreatedAt: time.Date(2022, 01, 25, 12, 45, 33, int(100*time.Millisecond), time.UTC), - CompletedAt: time.Date(2022, 01, 25, 12, 45, 36, int(400*time.Millisecond), time.UTC), + CreatedAt: timing.MustParse("2022-01-25T12:45:33.100000000Z"), + CompletedAt: timing.MustParse("2022-01-25T12:45:36.400000000Z"), }, expected: 4, }, { name: "LessThan1Sec", run: test.Run{ - CreatedAt: time.Date(2022, 01, 25, 12, 45, 33, int(100*time.Millisecond), time.UTC), - CompletedAt: time.Date(2022, 01, 25, 12, 45, 33, int(400*time.Millisecond), time.UTC), + CreatedAt: timing.MustParse("2022-01-25T12:45:33.100000000Z"), + CompletedAt: timing.MustParse("2022-01-25T12:45:33.400000000Z"), }, expected: 1, }, { name: "StillRunning", run: test.Run{ - CreatedAt: time.Date(2022, 01, 25, 12, 45, 33, int(100*time.Millisecond), time.UTC), + CreatedAt: timing.MustParse("2022-01-25T12:45:33.100000000Z"), }, - now: time.Date(2022, 01, 25, 12, 45, 34, int(300*time.Millisecond), time.UTC), + now: timing.MustParse("2022-01-25T12:45:34.300000000Z"), expected: 2, }, { name: "ZeroedDate", run: test.Run{ - CreatedAt: time.Date(2022, 01, 25, 12, 45, 33, int(100*time.Millisecond), time.UTC), + CreatedAt: timing.MustParse("2022-01-25T12:45:33.100000000Z"), CompletedAt: time.Unix(0, 0), }, - now: time.Date(2022, 01, 25, 12, 45, 34, int(300*time.Millisecond), time.UTC), + now: timing.MustParse("2022-01-25T12:45:34.300000000Z"), expected: 2, }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - now := test.Now + now := timing.Now if c.now.Unix() > 0 { - test.Now = func() time.Time { + timing.Now = func() time.Time { return c.now } } assert.Equal(t, c.expected, c.run.ExecutionTime()) - test.Now = now + timing.Now = now }) } } @@ -80,49 +81,49 @@ func TestRunTriggerTime(t *testing.T) { { name: "CompletedOk", run: test.Run{ - ServiceTriggeredAt: time.Date(2022, 01, 25, 12, 45, 33, int(100*time.Millisecond), time.UTC), - ServiceTriggerCompletedAt: time.Date(2022, 01, 25, 12, 45, 36, int(400*time.Millisecond), time.UTC), + ServiceTriggeredAt: timing.MustParse("2022-01-25T12:45:33.100000000Z"), + ServiceTriggerCompletedAt: timing.MustParse("2022-01-25T12:45:36.400000000Z"), }, expected: 3300, }, { name: "LessThan1Sec", run: test.Run{ - ServiceTriggeredAt: time.Date(2022, 01, 25, 12, 45, 33, int(100*time.Millisecond), time.UTC), - ServiceTriggerCompletedAt: time.Date(2022, 01, 25, 12, 45, 33, int(400*time.Millisecond), time.UTC), + ServiceTriggeredAt: timing.MustParse("2022-01-25T12:45:33.100000000Z"), + ServiceTriggerCompletedAt: timing.MustParse("2022-01-25T12:45:33.400000000Z"), }, expected: 300, }, { name: "StillRunning", run: test.Run{ - ServiceTriggeredAt: time.Date(2022, 01, 25, 12, 45, 33, int(100*time.Millisecond), time.UTC), + ServiceTriggeredAt: timing.MustParse("2022-01-25T12:45:33.100000000Z"), }, - now: time.Date(2022, 01, 25, 12, 45, 34, int(300*time.Millisecond), time.UTC), + now: timing.MustParse("2022-01-25T12:45:34.300000000Z"), expected: 1200, }, { name: "ZeroedDate", run: test.Run{ - ServiceTriggeredAt: time.Date(2022, 01, 25, 12, 45, 33, int(100*time.Millisecond), time.UTC), + ServiceTriggeredAt: timing.MustParse("2022-01-25T12:45:33.100000000Z"), ServiceTriggerCompletedAt: time.Unix(0, 0), }, - now: time.Date(2022, 01, 25, 12, 45, 34, int(300*time.Millisecond), time.UTC), + now: timing.MustParse("2022-01-25T12:45:34.300000000Z"), expected: 1200, }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - now := test.Now + now := timing.Now if c.now.Unix() > 0 { - test.Now = func() time.Time { + timing.Now = func() time.Time { return c.now } } assert.Equal(t, c.expected, c.run.TriggerTime()) - test.Now = now + timing.Now = now }) } } diff --git a/server/traces/span_entitiess.go b/server/traces/span_entitiess.go index b630ce387d..a3d2ed4719 100644 --- a/server/traces/span_entitiess.go +++ b/server/traces/span_entitiess.go @@ -6,6 +6,7 @@ import ( "strconv" "time" + "github.com/kubeshop/tracetest/server/pkg/timing" "github.com/kubeshop/tracetest/server/test/trigger" "go.opentelemetry.io/otel/trace" ) @@ -124,8 +125,8 @@ func encodeSpan(s Span) encodedSpan { return encodedSpan{ ID: s.ID.String(), Name: s.Name, - StartTime: fmt.Sprintf("%d", s.StartTime.UnixMilli()), - EndTime: fmt.Sprintf("%d", s.EndTime.UnixMilli()), + StartTime: strconv.FormatInt(s.StartTime.UnixMilli(), 10), + EndTime: strconv.FormatInt(s.EndTime.UnixMilli(), 10), Attributes: s.Attributes, Children: encodeChildren(s.Children), } @@ -180,7 +181,7 @@ func (s *Span) decodeSpan(aux encodedSpan) error { } func getTimeFromString(value string) (time.Time, error) { - milliseconds, err := strconv.Atoi(value) + parsedValue, err := strconv.Atoi(value) if err != nil { // Maybe it is in RFC3339 format. Convert it for compatibility sake output, err := time.Parse(time.RFC3339, value) @@ -191,7 +192,7 @@ func getTimeFromString(value string) (time.Time, error) { return output, nil } - return time.UnixMilli(int64(milliseconds)), nil + return timing.ParseUnix(int64(parsedValue)), nil } func decodeChildren(parent *Span, children []encodedSpan, cache spanCache) ([]*Span, error) { @@ -233,8 +234,8 @@ func (span Span) setMetadataAttributes() Span { span.Attributes[TracetestMetadataFieldName] = span.Name span.Attributes[TracetestMetadataFieldType] = spanType(span.Attributes) span.Attributes[TracetestMetadataFieldDuration] = spanDuration(span) - span.Attributes[TracetestMetadataFieldStartTime] = fmt.Sprintf("%d", span.StartTime.UnixNano()) - span.Attributes[TracetestMetadataFieldEndTime] = fmt.Sprintf("%d", span.EndTime.UnixNano()) + span.Attributes[TracetestMetadataFieldStartTime] = strconv.FormatInt(span.StartTime.UTC().UnixNano(), 10) + span.Attributes[TracetestMetadataFieldEndTime] = strconv.FormatInt(span.EndTime.UTC().UnixNano(), 10) if span.Status != nil { span.Attributes[TracetestMetadataFieldStatusCode] = span.Status.Code @@ -249,13 +250,13 @@ func (span Span) setTriggerResultAttributes(result trigger.TriggerResult) Span { case trigger.TriggerTypeHTTP: resp := result.HTTP jsonheaders, _ := json.Marshal(resp.Headers) - span.Attributes["tracetest.response.status"] = fmt.Sprintf("%d", resp.StatusCode) + span.Attributes["tracetest.response.status"] = strconv.Itoa(resp.StatusCode) span.Attributes["tracetest.response.body"] = resp.Body span.Attributes["tracetest.response.headers"] = string(jsonheaders) case trigger.TriggerTypeGRPC: resp := result.GRPC jsonheaders, _ := json.Marshal(resp.Metadata) - span.Attributes["tracetest.response.status"] = fmt.Sprintf("%d", resp.StatusCode) + span.Attributes["tracetest.response.status"] = strconv.Itoa(resp.StatusCode) span.Attributes["tracetest.response.body"] = resp.Body span.Attributes["tracetest.response.headers"] = string(jsonheaders) } diff --git a/server/traces/trace_entities.go b/server/traces/trace_entities.go index bb26b43d01..e0ca79a159 100644 --- a/server/traces/trace_entities.go +++ b/server/traces/trace_entities.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "sort" + "strconv" "strings" "github.com/kubeshop/tracetest/server/pkg/id" @@ -144,7 +145,7 @@ func spanType(attrs Attributes) string { func spanDuration(span Span) string { timeDifference := timing.TimeDiff(span.StartTime, span.EndTime) - return fmt.Sprintf("%d", timing.DurationInNanoseconds(timeDifference)) + return strconv.FormatInt(timeDifference.Nanoseconds(), 10) } func (t *Trace) Sort() Trace {