Skip to content

Commit

Permalink
wip - isolating trace poller start
Browse files Browse the repository at this point in the history
  • Loading branch information
danielbdias committed Sep 11, 2023
1 parent e14b3eb commit a9a80aa
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 20 deletions.
11 changes: 8 additions & 3 deletions server/app/test_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func buildTestPipeline(
eventEmitter,
)

tracePoller := executor.NewTracePoller(
tracePollerStartWorker := executor.NewTracePollerStartWorker(
eventEmitter,
)

tracePollerWorker := executor.NewTracePoller(
pollerExecutor,
execTestUpdater,
subscriptionManager,
Expand Down Expand Up @@ -108,12 +112,13 @@ func buildTestPipeline(
executor.PipelineStep{Processor: triggerResolverWorker, Driver: pgQueue.Channel("trigger_resolve")},
executor.PipelineStep{Processor: triggerExecuterWorker, Driver: pgQueue.Channel("trigger_execute")},
executor.PipelineStep{Processor: triggerResultProcessorWorker, Driver: pgQueue.Channel("trigger_result")},
executor.PipelineStep{Processor: tracePoller, Driver: pgQueue.Channel("tracePoller")},
executor.PipelineStep{Processor: tracePollerStartWorker, Driver: pgQueue.Channel("tracePoller_start")},
executor.PipelineStep{Processor: tracePollerWorker, Driver: pgQueue.Channel("tracePoller")},
executor.PipelineStep{Processor: linterRunner, Driver: pgQueue.Channel("linterRunner")},
executor.PipelineStep{Processor: assertionRunner, Driver: pgQueue.Channel("assertionRunner")},
)

const assertionRunnerStepIndex = 3
const assertionRunnerStepIndex = 6

return executor.NewTestPipeline(
pipeline,
Expand Down
17 changes: 0 additions & 17 deletions server/executor/trace_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ func (tp tracePoller) enqueueJob(ctx context.Context, job Job) {
tp.inputQueue.Enqueue(ctx, job)
}

func (tp tracePoller) isFirstRequest(job Job) bool {
return job.EnqueueCount() == 0
}

func (tp *tracePoller) SetOutputQueue(queue Enqueuer) {
tp.outputQueue = queue
}
Expand All @@ -118,15 +114,6 @@ func (tp *tracePoller) ProcessItem(ctx context.Context, job Job) {
return
}

if tp.isFirstRequest(job) {
err := tp.eventEmitter.Emit(ctx, events.TraceFetchingStart(job.Test.ID, job.Run.ID))
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingStart event: %s", job.Test.ID, job.Run.ID, err.Error())
}
}

log.Println("TracePoller] processJob", job.EnqueueCount())

result, err := tp.pollerExecutor.ExecuteRequest(ctx, &job)
run := result.run
if err != nil {
Expand Down Expand Up @@ -226,7 +213,3 @@ func (tp tracePoller) requeue(ctx context.Context, job Job) {
tp.enqueueJob(ctx, job)
}()
}

func isFirstRequest(job *Job) bool {
return !job.Headers.GetBool("requeued")
}
44 changes: 44 additions & 0 deletions server/executor/trace_poller_start_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package executor

import (
"context"
"log"

"github.com/kubeshop/tracetest/server/model/events"
)

type tracePollerStartWorker struct {
eventEmitter EventEmitter
outputQueue Enqueuer
}

func NewTracePollerStartWorker(
eventEmitter EventEmitter,
) *tracePollerStartWorker {
return &tracePollerStartWorker{
eventEmitter: eventEmitter,
}
}

func (w *tracePollerStartWorker) SetOutputQueue(queue Enqueuer) {
w.outputQueue = queue
}

func (w *tracePollerStartWorker) ProcessItem(ctx context.Context, job Job) {
if w.isFirstRequest(job) {
err := w.eventEmitter.Emit(ctx, events.TraceFetchingStart(job.Test.ID, job.Run.ID))
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to emit TracePollingStart event: %s", job.Test.ID, job.Run.ID, err.Error())
}
}

log.Println("[TracePoller] processJob", job.EnqueueCount())

// TODO: check if there is more "pre-processing" things to do here

w.outputQueue.Enqueue(ctx, job)
}

func (w *tracePollerStartWorker) isFirstRequest(job Job) bool {
return job.EnqueueCount() == 0
}

0 comments on commit a9a80aa

Please sign in to comment.