Skip to content

Commit

Permalink
Adding polling stop strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
danielbdias committed Sep 12, 2023
1 parent b5a2925 commit e864826
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 78 deletions.
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBj
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
Expand Down
6 changes: 5 additions & 1 deletion server/app/test_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/tracepollerworker"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/testrunner"
"github.com/kubeshop/tracetest/server/executor/tracepollerworker"
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/linter/analyzer"
"github.com/kubeshop/tracetest/server/model"
Expand Down Expand Up @@ -58,6 +58,7 @@ func buildTestPipeline(
dsRepo,
execTestUpdater,
subscriptionManager,
tracer,
)

traceFetcherWorker := tracepollerworker.NewFetcherWorker(
Expand All @@ -66,6 +67,7 @@ func buildTestPipeline(
dsRepo,
execTestUpdater,
subscriptionManager,
tracer,
)

tracePollerEvaluatorWorker := tracepollerworker.NewEvaluatorWorker(
Expand All @@ -74,6 +76,8 @@ func buildTestPipeline(
dsRepo,
execTestUpdater,
subscriptionManager,
tracepollerworker.NewSelectorBasedPollingStopStrategy(eventEmitter, tracepollerworker.NewSpanCountPollingStopStrategy()),
tracer,
)

triggerResolverWorker := executor.NewTriggerResolverWorker(
Expand Down
20 changes: 11 additions & 9 deletions server/executor/tracepollerworker/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package tracepollerworker

import (
"context"
"errors"
"fmt"
"log"
"errors"
"time"

"github.com/kubeshop/tracetest/server/analytics"
Expand All @@ -16,15 +16,18 @@ import (
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/tracedb/connection"

"go.opentelemetry.io/otel/trace"
)

type workerState struct {
eventEmitter executor.EventEmitter
newTraceDBFn tracedb.FactoryFunc
dsRepo resourcemanager.Current[datastore.DataStore]
eventEmitter executor.EventEmitter
newTraceDBFn tracedb.FactoryFunc
dsRepo resourcemanager.Current[datastore.DataStore]
updater executor.RunUpdater
subscriptionManager *subscription.Manager
inputQueue pipeline.Enqueuer[executor.Job]
tracer trace.Tracer
inputQueue pipeline.Enqueuer[executor.Job]
}

func getTraceDB(ctx context.Context, state *workerState) (tracedb.TraceDB, error) {
Expand Down Expand Up @@ -60,7 +63,6 @@ func handleError(ctx context.Context, job executor.Job, err error, state *worker
}
}


func handleTraceDBError(ctx context.Context, job executor.Job, err error, state *workerState) (bool, string) {
run := job.Run

Expand Down Expand Up @@ -117,9 +119,9 @@ func requeue(ctx context.Context, job executor.Job, state *workerState) {
job.Headers.SetBool("requeued", true)

select {
default:
case <-ctx.Done():
return
default:
case <-ctx.Done():
return
}

state.inputQueue.Enqueue(ctx, job)
Expand Down
90 changes: 46 additions & 44 deletions server/executor/tracepollerworker/evaluator_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tracepollerworker
import (
"context"
"log"
"fmt"

"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
Expand All @@ -14,29 +13,40 @@ import (
"github.com/kubeshop/tracetest/server/test"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/traces"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type PollingStopStrategy interface {
Evaluate(ctx context.Context, job *executor.Job, traceDB tracedb.TraceDB, trace *traces.Trace) (bool, string)
}

type tracePollerEvaluatorWorker struct {
state *workerState
state *workerState
outputQueue pipeline.Enqueuer[executor.Job]
stopStrategy PollingStopStrategy
}

func NewEvaluatorWorker(
eventEmitter executor.EventEmitter,
newTraceDBFn tracedb.FactoryFunc,
dsRepo resourcemanager.Current[datastore.DataStore],
updater executor.RunUpdater,
updater executor.RunUpdater,
subscriptionManager *subscription.Manager,
stopStrategy PollingStopStrategy,
tracer trace.Tracer,
) *tracePollerEvaluatorWorker {
state := &workerState{
eventEmitter: eventEmitter,
newTraceDBFn: newTraceDBFn,
dsRepo: dsRepo,
updater: updater,
eventEmitter: eventEmitter,
newTraceDBFn: newTraceDBFn,
dsRepo: dsRepo,
updater: updater,
subscriptionManager: subscriptionManager,
tracer: tracer,
}

return &tracePollerEvaluatorWorker{state: state}
return &tracePollerEvaluatorWorker{state: state, stopStrategy: stopStrategy}
}

func (w *tracePollerEvaluatorWorker) SetInputQueue(queue pipeline.Enqueuer[executor.Job]) {
Expand All @@ -47,24 +57,48 @@ func (w *tracePollerEvaluatorWorker) SetOutputQueue(queue pipeline.Enqueuer[exec
w.outputQueue = queue
}

// TODO: add instrumentation and selector based features

func (w *tracePollerEvaluatorWorker) ProcessItem(ctx context.Context, job executor.Job) {
_, span := w.state.tracer.Start(ctx, "Trace Evaluate")
defer span.End()

traceDB, err := getTraceDB(ctx, w.state)
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: GetDataStore error: %s", job.Test.ID, job.Run.ID, err.Error())
handleError(ctx, job, err, w.state)
return
}

done, reason := w.donePollingTraces(&job, traceDB, job.Run.Trace)
done, reason := w.stopStrategy.Evaluate(ctx, &job, traceDB, job.Run.Trace)

spanCount := 0
if job.Run.Trace != nil {
spanCount = len(job.Run.Trace.Flat)
}

attrs := []attribute.KeyValue{
attribute.String("tracetest.run.trace_poller.trace_id", job.Run.TraceID.String()),
attribute.String("tracetest.run.trace_poller.span_id", job.Run.SpanID.String()),
attribute.Bool("tracetest.run.trace_poller.succesful", done),
attribute.String("tracetest.run.trace_poller.test_id", string(job.Test.ID)),
attribute.Int("tracetest.run.trace_poller.amount_retrieved_spans", spanCount),
}

if reason != "" {
attrs = append(attrs, attribute.String("tracetest.run.trace_poller.finish_reason", reason))
}

if err != nil {
attrs = append(attrs, attribute.String("tracetest.run.trace_poller.error", err.Error()))
span.RecordError(err)
}

span.SetAttributes(attrs...)

if !done {
err := w.state.eventEmitter.Emit(ctx, events.TracePollingIterationInfo(job.Test.ID, job.Run.ID, len(job.Run.Trace.Flat), job.EnqueueCount(), false, reason))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s", job.Test.ID, job.Run.ID, err.Error())
}

log.Printf("[PollerExecutor] Test %s Run %d: Not done polling. (%s)", job.Test.ID, job.Run.ID, reason)

requeue(ctx, job, w.state)
Expand Down Expand Up @@ -111,35 +145,3 @@ func (w *tracePollerEvaluatorWorker) ProcessItem(ctx context.Context, job execut

w.outputQueue.Enqueue(ctx, job)
}

func (w *tracePollerEvaluatorWorker) donePollingTraces(job *executor.Job, traceDB tracedb.TraceDB, trace *traces.Trace) (bool, string) {
if !traceDB.ShouldRetry() {
return true, "TraceDB is not retryable"
}

maxTracePollRetry := job.PollingProfile.Periodic.MaxTracePollRetry()
// we're done if we have the same amount of spans after polling or `maxTracePollRetry` times
log.Printf("[PollerExecutor] Test %s Run %d: Job count %d, max retries: %d", job.Test.ID, job.Run.ID, job.EnqueueCount(), maxTracePollRetry)
if job.EnqueueCount() >= maxTracePollRetry {
return true, fmt.Sprintf("Hit MaxRetry of %d", maxTracePollRetry)
}

if job.Run.Trace == nil {
return false, "First iteration"
}

haveNotCollectedSpansSinceLastPoll := len(trace.Flat) == len(job.Run.Trace.Flat)
haveCollectedSpansInTestRun := len(trace.Flat) > 0
haveCollectedOnlyRootNode := len(trace.Flat) == 1 && trace.HasRootSpan()

// Today we consider that we finished collecting traces
// if we haven't collected any new spans since our last poll
// and we have collected at least one span for this test run
// and we have not collected only the root span

if haveNotCollectedSpansSinceLastPoll && haveCollectedSpansInTestRun && !haveCollectedOnlyRootNode {
return true, fmt.Sprintf("Trace has no new spans. Spans found: %d", len(trace.Flat))
}

return false, fmt.Sprintf("New spans found. Before: %d After: %d", len(job.Run.Trace.Flat), len(trace.Flat))
}
26 changes: 16 additions & 10 deletions server/executor/tracepollerworker/fetcher_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,36 @@ import (
"context"
"log"

"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/resourcemanager"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)

type traceFetcherWorker struct {
state *workerState
outputQueue pipeline.Enqueuer[executor.Job]
state *workerState
outputQueue pipeline.Enqueuer[executor.Job]
}

func NewFetcherWorker(
eventEmitter executor.EventEmitter,
newTraceDBFn tracedb.FactoryFunc,
dsRepo resourcemanager.Current[datastore.DataStore],
updater executor.RunUpdater,
updater executor.RunUpdater,
subscriptionManager *subscription.Manager,
tracer trace.Tracer,
) *traceFetcherWorker {
state := &workerState{
eventEmitter: eventEmitter,
newTraceDBFn: newTraceDBFn,
dsRepo: dsRepo,
updater: updater,
eventEmitter: eventEmitter,
newTraceDBFn: newTraceDBFn,
dsRepo: dsRepo,
updater: updater,
subscriptionManager: subscriptionManager,
tracer: tracer,
}

return &traceFetcherWorker{state: state}
Expand All @@ -45,6 +48,9 @@ func (w *traceFetcherWorker) SetOutputQueue(queue pipeline.Enqueuer[executor.Job
}

func (w *traceFetcherWorker) ProcessItem(ctx context.Context, job executor.Job) {
_, span := w.state.tracer.Start(ctx, "Trace Fetch")
defer span.End()

traceDB, err := getTraceDB(ctx, w.state)
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: GetDataStore error: %s", job.Test.ID, job.Run.ID, err.Error())
Expand Down
Loading

0 comments on commit e864826

Please sign in to comment.