From e8648261f33b04526f5729fc778ac759cf5e5947 Mon Sep 17 00:00:00 2001 From: Daniel Dias Date: Tue, 12 Sep 2023 19:11:04 -0300 Subject: [PATCH] Adding polling stop strategy --- go.work.sum | 1 + server/app/test_pipeline.go | 6 +- server/executor/tracepollerworker/common.go | 20 ++-- .../tracepollerworker/evaluator_worker.go | 90 +++++++-------- .../tracepollerworker/fetcher_worker.go | 26 +++-- .../selector_based_polling_stop_strategy.go | 103 ++++++++++++++++++ .../span_count_polling_stop_strategy.go | 52 +++++++++ .../tracepollerworker/starter_worker.go | 34 +++--- 8 files changed, 254 insertions(+), 78 deletions(-) create mode 100644 server/executor/tracepollerworker/selector_based_polling_stop_strategy.go create mode 100644 server/executor/tracepollerworker/span_count_polling_stop_strategy.go diff --git a/go.work.sum b/go.work.sum index 0c1c48ed1e..27c4d87090 100644 --- a/go.work.sum +++ b/go.work.sum @@ -151,6 +151,7 @@ github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBj github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index f24c34d3ce..7195c27ad7 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -4,9 +4,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/executor" - "github.com/kubeshop/tracetest/server/executor/tracepollerworker" "github.com/kubeshop/tracetest/server/executor/pollingprofile" "github.com/kubeshop/tracetest/server/executor/testrunner" + "github.com/kubeshop/tracetest/server/executor/tracepollerworker" "github.com/kubeshop/tracetest/server/executor/trigger" "github.com/kubeshop/tracetest/server/linter/analyzer" "github.com/kubeshop/tracetest/server/model" @@ -58,6 +58,7 @@ func buildTestPipeline( dsRepo, execTestUpdater, subscriptionManager, + tracer, ) traceFetcherWorker := tracepollerworker.NewFetcherWorker( @@ -66,6 +67,7 @@ func buildTestPipeline( dsRepo, execTestUpdater, subscriptionManager, + tracer, ) tracePollerEvaluatorWorker := tracepollerworker.NewEvaluatorWorker( @@ -74,6 +76,8 @@ func buildTestPipeline( dsRepo, execTestUpdater, subscriptionManager, + tracepollerworker.NewSelectorBasedPollingStopStrategy(eventEmitter, tracepollerworker.NewSpanCountPollingStopStrategy()), + tracer, ) triggerResolverWorker := executor.NewTriggerResolverWorker( diff --git a/server/executor/tracepollerworker/common.go b/server/executor/tracepollerworker/common.go index c47c62ac42..6892f16bf8 100644 --- a/server/executor/tracepollerworker/common.go +++ b/server/executor/tracepollerworker/common.go @@ -2,9 +2,9 @@ package tracepollerworker import ( "context" + "errors" "fmt" "log" - "errors" "time" "github.com/kubeshop/tracetest/server/analytics" @@ -16,15 +16,18 @@ import ( "github.com/kubeshop/tracetest/server/subscription" "github.com/kubeshop/tracetest/server/tracedb" "github.com/kubeshop/tracetest/server/tracedb/connection" + + "go.opentelemetry.io/otel/trace" ) type workerState struct { - eventEmitter executor.EventEmitter - newTraceDBFn tracedb.FactoryFunc - dsRepo resourcemanager.Current[datastore.DataStore] + eventEmitter executor.EventEmitter + newTraceDBFn tracedb.FactoryFunc + dsRepo resourcemanager.Current[datastore.DataStore] updater executor.RunUpdater subscriptionManager *subscription.Manager - inputQueue pipeline.Enqueuer[executor.Job] + tracer trace.Tracer + inputQueue pipeline.Enqueuer[executor.Job] } func getTraceDB(ctx context.Context, state *workerState) (tracedb.TraceDB, error) { @@ -60,7 +63,6 @@ func handleError(ctx context.Context, job executor.Job, err error, state *worker } } - func handleTraceDBError(ctx context.Context, job executor.Job, err error, state *workerState) (bool, string) { run := job.Run @@ -117,9 +119,9 @@ func requeue(ctx context.Context, job executor.Job, state *workerState) { job.Headers.SetBool("requeued", true) select { - default: - case <-ctx.Done(): - return + default: + case <-ctx.Done(): + return } state.inputQueue.Enqueue(ctx, job) diff --git a/server/executor/tracepollerworker/evaluator_worker.go b/server/executor/tracepollerworker/evaluator_worker.go index b9724ff4f0..4daf8688f6 100644 --- a/server/executor/tracepollerworker/evaluator_worker.go +++ b/server/executor/tracepollerworker/evaluator_worker.go @@ -3,7 +3,6 @@ package tracepollerworker import ( "context" "log" - "fmt" "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/executor" @@ -14,29 +13,40 @@ import ( "github.com/kubeshop/tracetest/server/test" "github.com/kubeshop/tracetest/server/tracedb" "github.com/kubeshop/tracetest/server/traces" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) +type PollingStopStrategy interface { + Evaluate(ctx context.Context, job *executor.Job, traceDB tracedb.TraceDB, trace *traces.Trace) (bool, string) +} + type tracePollerEvaluatorWorker struct { - state *workerState + state *workerState outputQueue pipeline.Enqueuer[executor.Job] + stopStrategy PollingStopStrategy } func NewEvaluatorWorker( eventEmitter executor.EventEmitter, newTraceDBFn tracedb.FactoryFunc, dsRepo resourcemanager.Current[datastore.DataStore], - updater executor.RunUpdater, + updater executor.RunUpdater, subscriptionManager *subscription.Manager, + stopStrategy PollingStopStrategy, + tracer trace.Tracer, ) *tracePollerEvaluatorWorker { state := &workerState{ - eventEmitter: eventEmitter, - newTraceDBFn: newTraceDBFn, - dsRepo: dsRepo, - updater: updater, + eventEmitter: eventEmitter, + newTraceDBFn: newTraceDBFn, + dsRepo: dsRepo, + updater: updater, subscriptionManager: subscriptionManager, + tracer: tracer, } - return &tracePollerEvaluatorWorker{state: state} + return &tracePollerEvaluatorWorker{state: state, stopStrategy: stopStrategy} } func (w *tracePollerEvaluatorWorker) SetInputQueue(queue pipeline.Enqueuer[executor.Job]) { @@ -47,9 +57,10 @@ func (w *tracePollerEvaluatorWorker) SetOutputQueue(queue pipeline.Enqueuer[exec w.outputQueue = queue } -// TODO: add instrumentation and selector based features - func (w *tracePollerEvaluatorWorker) ProcessItem(ctx context.Context, job executor.Job) { + _, span := w.state.tracer.Start(ctx, "Trace Evaluate") + defer span.End() + traceDB, err := getTraceDB(ctx, w.state) if err != nil { log.Printf("[PollerExecutor] Test %s Run %d: GetDataStore error: %s", job.Test.ID, job.Run.ID, err.Error()) @@ -57,14 +68,37 @@ func (w *tracePollerEvaluatorWorker) ProcessItem(ctx context.Context, job execut return } - done, reason := w.donePollingTraces(&job, traceDB, job.Run.Trace) + done, reason := w.stopStrategy.Evaluate(ctx, &job, traceDB, job.Run.Trace) + + spanCount := 0 + if job.Run.Trace != nil { + spanCount = len(job.Run.Trace.Flat) + } + + attrs := []attribute.KeyValue{ + attribute.String("tracetest.run.trace_poller.trace_id", job.Run.TraceID.String()), + attribute.String("tracetest.run.trace_poller.span_id", job.Run.SpanID.String()), + attribute.Bool("tracetest.run.trace_poller.succesful", done), + attribute.String("tracetest.run.trace_poller.test_id", string(job.Test.ID)), + attribute.Int("tracetest.run.trace_poller.amount_retrieved_spans", spanCount), + } + + if reason != "" { + attrs = append(attrs, attribute.String("tracetest.run.trace_poller.finish_reason", reason)) + } + + if err != nil { + attrs = append(attrs, attribute.String("tracetest.run.trace_poller.error", err.Error())) + span.RecordError(err) + } + + span.SetAttributes(attrs...) if !done { err := w.state.eventEmitter.Emit(ctx, events.TracePollingIterationInfo(job.Test.ID, job.Run.ID, len(job.Run.Trace.Flat), job.EnqueueCount(), false, reason)) if err != nil { log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s", job.Test.ID, job.Run.ID, err.Error()) } - log.Printf("[PollerExecutor] Test %s Run %d: Not done polling. (%s)", job.Test.ID, job.Run.ID, reason) requeue(ctx, job, w.state) @@ -111,35 +145,3 @@ func (w *tracePollerEvaluatorWorker) ProcessItem(ctx context.Context, job execut w.outputQueue.Enqueue(ctx, job) } - -func (w *tracePollerEvaluatorWorker) donePollingTraces(job *executor.Job, traceDB tracedb.TraceDB, trace *traces.Trace) (bool, string) { - if !traceDB.ShouldRetry() { - return true, "TraceDB is not retryable" - } - - maxTracePollRetry := job.PollingProfile.Periodic.MaxTracePollRetry() - // we're done if we have the same amount of spans after polling or `maxTracePollRetry` times - log.Printf("[PollerExecutor] Test %s Run %d: Job count %d, max retries: %d", job.Test.ID, job.Run.ID, job.EnqueueCount(), maxTracePollRetry) - if job.EnqueueCount() >= maxTracePollRetry { - return true, fmt.Sprintf("Hit MaxRetry of %d", maxTracePollRetry) - } - - if job.Run.Trace == nil { - return false, "First iteration" - } - - haveNotCollectedSpansSinceLastPoll := len(trace.Flat) == len(job.Run.Trace.Flat) - haveCollectedSpansInTestRun := len(trace.Flat) > 0 - haveCollectedOnlyRootNode := len(trace.Flat) == 1 && trace.HasRootSpan() - - // Today we consider that we finished collecting traces - // if we haven't collected any new spans since our last poll - // and we have collected at least one span for this test run - // and we have not collected only the root span - - if haveNotCollectedSpansSinceLastPoll && haveCollectedSpansInTestRun && !haveCollectedOnlyRootNode { - return true, fmt.Sprintf("Trace has no new spans. Spans found: %d", len(trace.Flat)) - } - - return false, fmt.Sprintf("New spans found. Before: %d After: %d", len(job.Run.Trace.Flat), len(trace.Flat)) -} diff --git a/server/executor/tracepollerworker/fetcher_worker.go b/server/executor/tracepollerworker/fetcher_worker.go index 22565973f3..ee4977e8ab 100644 --- a/server/executor/tracepollerworker/fetcher_worker.go +++ b/server/executor/tracepollerworker/fetcher_worker.go @@ -4,33 +4,36 @@ import ( "context" "log" + "github.com/kubeshop/tracetest/server/datastore" + "github.com/kubeshop/tracetest/server/executor" "github.com/kubeshop/tracetest/server/model/events" "github.com/kubeshop/tracetest/server/pkg/pipeline" "github.com/kubeshop/tracetest/server/resourcemanager" - "github.com/kubeshop/tracetest/server/tracedb" - "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/subscription" - "github.com/kubeshop/tracetest/server/executor" + "github.com/kubeshop/tracetest/server/tracedb" + "go.opentelemetry.io/otel/trace" ) type traceFetcherWorker struct { - state *workerState - outputQueue pipeline.Enqueuer[executor.Job] + state *workerState + outputQueue pipeline.Enqueuer[executor.Job] } func NewFetcherWorker( eventEmitter executor.EventEmitter, newTraceDBFn tracedb.FactoryFunc, dsRepo resourcemanager.Current[datastore.DataStore], - updater executor.RunUpdater, + updater executor.RunUpdater, subscriptionManager *subscription.Manager, + tracer trace.Tracer, ) *traceFetcherWorker { state := &workerState{ - eventEmitter: eventEmitter, - newTraceDBFn: newTraceDBFn, - dsRepo: dsRepo, - updater: updater, + eventEmitter: eventEmitter, + newTraceDBFn: newTraceDBFn, + dsRepo: dsRepo, + updater: updater, subscriptionManager: subscriptionManager, + tracer: tracer, } return &traceFetcherWorker{state: state} @@ -45,6 +48,9 @@ func (w *traceFetcherWorker) SetOutputQueue(queue pipeline.Enqueuer[executor.Job } func (w *traceFetcherWorker) ProcessItem(ctx context.Context, job executor.Job) { + _, span := w.state.tracer.Start(ctx, "Trace Fetch") + defer span.End() + traceDB, err := getTraceDB(ctx, w.state) if err != nil { log.Printf("[PollerExecutor] Test %s Run %d: GetDataStore error: %s", job.Test.ID, job.Run.ID, err.Error()) diff --git a/server/executor/tracepollerworker/selector_based_polling_stop_strategy.go b/server/executor/tracepollerworker/selector_based_polling_stop_strategy.go new file mode 100644 index 0000000000..a284eb061b --- /dev/null +++ b/server/executor/tracepollerworker/selector_based_polling_stop_strategy.go @@ -0,0 +1,103 @@ +package tracepollerworker + +import ( + "context" + "fmt" + + "github.com/kubeshop/tracetest/server/assertions/selectors" + "github.com/kubeshop/tracetest/server/executor" + "github.com/kubeshop/tracetest/server/model/events" + "github.com/kubeshop/tracetest/server/test" + "github.com/kubeshop/tracetest/server/tracedb" + "github.com/kubeshop/tracetest/server/traces" +) + +type SelectorBasedPollingStopStrategy struct { + eventEmitter executor.EventEmitter + wrappedStrategy PollingStopStrategy +} + +const ( + selectorBasedPollerExecutorRetryHeader = "SelectorBasedPollerExecutor.retryCount" +) + +func NewSelectorBasedPollingStopStrategy(eventEmitter executor.EventEmitter, strategy PollingStopStrategy) *SelectorBasedPollingStopStrategy { + return &SelectorBasedPollingStopStrategy{ + eventEmitter: eventEmitter, + wrappedStrategy: strategy, + } +} + +// Evaluate implements PollingStopStrategy. +func (s *SelectorBasedPollingStopStrategy) Evaluate(ctx context.Context, job *executor.Job, traceDB tracedb.TraceDB, trace *traces.Trace) (bool, string) { + finished, reason := s.wrappedStrategy.Evaluate(ctx, job, traceDB, trace) + + if !finished { + job.Headers.SetInt(selectorBasedPollerExecutorRetryHeader, 0) + return finished, reason + } + + maxNumberRetries := 0 + if job.PollingProfile.Periodic != nil { + maxNumberRetries = job.PollingProfile.Periodic.SelectorMatchRetries + } + + currentNumberTries := job.Headers.GetInt(selectorBasedPollerExecutorRetryHeader) + if currentNumberTries >= maxNumberRetries { + s.eventEmitter.Emit(ctx, events.TracePollingIterationInfo( + job.Test.ID, + job.Run.ID, + len(job.Run.Trace.Flat), + currentNumberTries, + true, + fmt.Sprintf("Some selectors did not match any spans in the current trace, but after %d tries, the trace probably won't change", currentNumberTries), + )) + + return true, reason + } + + allSelectorsMatchSpans := s.allSelectorsMatchSpans(job) + if allSelectorsMatchSpans { + s.eventEmitter.Emit(ctx, events.TracePollingIterationInfo( + job.Test.ID, + job.Run.ID, + len(job.Run.Trace.Flat), + currentNumberTries, + true, + "All selectors from the test matched at least one span in the current trace", + )) + return true, reason + } + + job.Headers.SetInt(selectorBasedPollerExecutorRetryHeader, currentNumberTries+1) + + s.eventEmitter.Emit(ctx, events.TracePollingIterationInfo( + job.Test.ID, + job.Run.ID, + len(job.Run.Trace.Flat), + job.Headers.GetInt(selectorBasedPollerExecutorRetryHeader), + false, + "All selectors from your test must match at least one span in the trace, some of them did not match any", + )) + + return false, "not all selectors got matching spans in the trace" +} + +func (pe SelectorBasedPollingStopStrategy) allSelectorsMatchSpans(job *executor.Job) bool { + allSelectorsHaveMatch := true + for _, spec := range job.Test.Specs { + spans := selector(spec.Selector).Filter(*job.Run.Trace) + if len(spans) == 0 { + allSelectorsHaveMatch = false + } + } + + return allSelectorsHaveMatch +} + +func selector(sq test.SpanQuery) selectors.Selector { + sel, _ := selectors.New(string(sq)) + return sel +} + +var _ PollingStopStrategy = &SelectorBasedPollingStopStrategy{} diff --git a/server/executor/tracepollerworker/span_count_polling_stop_strategy.go b/server/executor/tracepollerworker/span_count_polling_stop_strategy.go new file mode 100644 index 0000000000..cafe3f38a2 --- /dev/null +++ b/server/executor/tracepollerworker/span_count_polling_stop_strategy.go @@ -0,0 +1,52 @@ +package tracepollerworker + +import ( + "context" + "fmt" + "log" + + "github.com/kubeshop/tracetest/server/executor" + "github.com/kubeshop/tracetest/server/tracedb" + "github.com/kubeshop/tracetest/server/traces" +) + +type SpanCountPollingStopStrategy struct{} + +func NewSpanCountPollingStopStrategy() *SpanCountPollingStopStrategy { + return &SpanCountPollingStopStrategy{} +} + +// Evaluate implements PollingStopStrategy. +func (s *SpanCountPollingStopStrategy) Evaluate(ctx context.Context, job *executor.Job, traceDB tracedb.TraceDB, trace *traces.Trace) (bool, string) { + if !traceDB.ShouldRetry() { + return true, "TraceDB is not retryable" + } + + maxTracePollRetry := job.PollingProfile.Periodic.MaxTracePollRetry() + // we're done if we have the same amount of spans after polling or `maxTracePollRetry` times + log.Printf("[PollerExecutor] Test %s Run %d: Job count %d, max retries: %d", job.Test.ID, job.Run.ID, job.EnqueueCount(), maxTracePollRetry) + if job.EnqueueCount() >= maxTracePollRetry { + return true, fmt.Sprintf("Hit MaxRetry of %d", maxTracePollRetry) + } + + if job.Run.Trace == nil { + return false, "First iteration" + } + + haveNotCollectedSpansSinceLastPoll := len(trace.Flat) == len(job.Run.Trace.Flat) + haveCollectedSpansInTestRun := len(trace.Flat) > 0 + haveCollectedOnlyRootNode := len(trace.Flat) == 1 && trace.HasRootSpan() + + // Today we consider that we finished collecting traces + // if we haven't collected any new spans since our last poll + // and we have collected at least one span for this test run + // and we have not collected only the root span + + if haveNotCollectedSpansSinceLastPoll && haveCollectedSpansInTestRun && !haveCollectedOnlyRootNode { + return true, fmt.Sprintf("Trace has no new spans. Spans found: %d", len(trace.Flat)) + } + + return false, fmt.Sprintf("New spans found. Before: %d After: %d", len(job.Run.Trace.Flat), len(trace.Flat)) +} + +var _ PollingStopStrategy = &SpanCountPollingStopStrategy{} diff --git a/server/executor/tracepollerworker/starter_worker.go b/server/executor/tracepollerworker/starter_worker.go index 87737302f8..8e512ab922 100644 --- a/server/executor/tracepollerworker/starter_worker.go +++ b/server/executor/tracepollerworker/starter_worker.go @@ -2,36 +2,39 @@ package tracepollerworker import ( "context" - "log" "fmt" + "log" + "github.com/kubeshop/tracetest/server/datastore" + "github.com/kubeshop/tracetest/server/executor" "github.com/kubeshop/tracetest/server/model/events" "github.com/kubeshop/tracetest/server/pkg/pipeline" "github.com/kubeshop/tracetest/server/resourcemanager" - "github.com/kubeshop/tracetest/server/tracedb" - "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/subscription" - "github.com/kubeshop/tracetest/server/executor" + "github.com/kubeshop/tracetest/server/tracedb" + "go.opentelemetry.io/otel/trace" ) type tracePollerStarterWorker struct { - state *workerState - outputQueue pipeline.Enqueuer[executor.Job] + state *workerState + outputQueue pipeline.Enqueuer[executor.Job] } func NewStarterWorker( eventEmitter executor.EventEmitter, newTraceDBFn tracedb.FactoryFunc, dsRepo resourcemanager.Current[datastore.DataStore], - updater executor.RunUpdater, + updater executor.RunUpdater, subscriptionManager *subscription.Manager, + tracer trace.Tracer, ) *tracePollerStarterWorker { state := &workerState{ - eventEmitter: eventEmitter, - newTraceDBFn: newTraceDBFn, - dsRepo: dsRepo, - updater: updater, + eventEmitter: eventEmitter, + newTraceDBFn: newTraceDBFn, + dsRepo: dsRepo, + updater: updater, subscriptionManager: subscriptionManager, + tracer: tracer, } return &tracePollerStarterWorker{state: state} @@ -46,10 +49,13 @@ func (w *tracePollerStarterWorker) SetOutputQueue(queue pipeline.Enqueuer[execut } func (w *tracePollerStarterWorker) ProcessItem(ctx context.Context, job executor.Job) { + _, span := w.state.tracer.Start(ctx, "Trace Poller Start") + defer span.End() + select { - default: - case <-ctx.Done(): - return + default: + case <-ctx.Done(): + return } log.Println("[TracePoller] processJob", job.EnqueueCount())