From a9a80aab1859bbd20eea3347bdf38470292b6634 Mon Sep 17 00:00:00 2001 From: Daniel Dias Date: Thu, 7 Sep 2023 17:37:23 -0300 Subject: [PATCH] wip - isolating trace poller start --- server/app/test_pipeline.go | 11 +++-- server/executor/trace_poller.go | 17 -------- server/executor/trace_poller_start_worker.go | 44 ++++++++++++++++++++ 3 files changed, 52 insertions(+), 20 deletions(-) create mode 100644 server/executor/trace_poller_start_worker.go diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index 914687e3a0..bdcc3c09c2 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -61,7 +61,11 @@ func buildTestPipeline( eventEmitter, ) - tracePoller := executor.NewTracePoller( + tracePollerStartWorker := executor.NewTracePollerStartWorker( + eventEmitter, + ) + + tracePollerWorker := executor.NewTracePoller( pollerExecutor, execTestUpdater, subscriptionManager, @@ -108,12 +112,13 @@ func buildTestPipeline( executor.PipelineStep{Processor: triggerResolverWorker, Driver: pgQueue.Channel("trigger_resolve")}, executor.PipelineStep{Processor: triggerExecuterWorker, Driver: pgQueue.Channel("trigger_execute")}, executor.PipelineStep{Processor: triggerResultProcessorWorker, Driver: pgQueue.Channel("trigger_result")}, - executor.PipelineStep{Processor: tracePoller, Driver: pgQueue.Channel("tracePoller")}, + executor.PipelineStep{Processor: tracePollerStartWorker, Driver: pgQueue.Channel("tracePoller_start")}, + executor.PipelineStep{Processor: tracePollerWorker, Driver: pgQueue.Channel("tracePoller")}, executor.PipelineStep{Processor: linterRunner, Driver: pgQueue.Channel("linterRunner")}, executor.PipelineStep{Processor: assertionRunner, Driver: pgQueue.Channel("assertionRunner")}, ) - const assertionRunnerStepIndex = 3 + const assertionRunnerStepIndex = 6 return executor.NewTestPipeline( pipeline, diff --git a/server/executor/trace_poller.go b/server/executor/trace_poller.go index 4ab45bf969..d81230db0a 100644 --- a/server/executor/trace_poller.go +++ b/server/executor/trace_poller.go @@ -99,10 +99,6 @@ func (tp tracePoller) enqueueJob(ctx context.Context, job Job) { tp.inputQueue.Enqueue(ctx, job) } -func (tp tracePoller) isFirstRequest(job Job) bool { - return job.EnqueueCount() == 0 -} - func (tp *tracePoller) SetOutputQueue(queue Enqueuer) { tp.outputQueue = queue } @@ -118,15 +114,6 @@ func (tp *tracePoller) ProcessItem(ctx context.Context, job Job) { return } - if tp.isFirstRequest(job) { - err := tp.eventEmitter.Emit(ctx, events.TraceFetchingStart(job.Test.ID, job.Run.ID)) - if err != nil { - log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingStart event: %s", job.Test.ID, job.Run.ID, err.Error()) - } - } - - log.Println("TracePoller] processJob", job.EnqueueCount()) - result, err := tp.pollerExecutor.ExecuteRequest(ctx, &job) run := result.run if err != nil { @@ -226,7 +213,3 @@ func (tp tracePoller) requeue(ctx context.Context, job Job) { tp.enqueueJob(ctx, job) }() } - -func isFirstRequest(job *Job) bool { - return !job.Headers.GetBool("requeued") -} diff --git a/server/executor/trace_poller_start_worker.go b/server/executor/trace_poller_start_worker.go new file mode 100644 index 0000000000..4091827d86 --- /dev/null +++ b/server/executor/trace_poller_start_worker.go @@ -0,0 +1,44 @@ +package executor + +import ( + "context" + "log" + + "github.com/kubeshop/tracetest/server/model/events" +) + +type tracePollerStartWorker struct { + eventEmitter EventEmitter + outputQueue Enqueuer +} + +func NewTracePollerStartWorker( + eventEmitter EventEmitter, +) *tracePollerStartWorker { + return &tracePollerStartWorker{ + eventEmitter: eventEmitter, + } +} + +func (w *tracePollerStartWorker) SetOutputQueue(queue Enqueuer) { + w.outputQueue = queue +} + +func (w *tracePollerStartWorker) ProcessItem(ctx context.Context, job Job) { + if w.isFirstRequest(job) { + err := w.eventEmitter.Emit(ctx, events.TraceFetchingStart(job.Test.ID, job.Run.ID)) + if err != nil { + log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingStart event: %s", job.Test.ID, job.Run.ID, err.Error()) + } + } + + log.Println("[TracePoller] processJob", job.EnqueueCount()) + + // TODO: check if there is more "pre-processing" things to do here + + w.outputQueue.Enqueue(ctx, job) +} + +func (w *tracePollerStartWorker) isFirstRequest(job Job) bool { + return job.EnqueueCount() == 0 +}