diff --git a/api/tests.yaml b/api/tests.yaml index 36e9851b95..2966c8ea3f 100644 --- a/api/tests.yaml +++ b/api/tests.yaml @@ -28,7 +28,7 @@ components: type: object properties: id: - type: string + type: integer readOnly: true name: type: string diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index 3c4370739e..914687e3a0 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -88,6 +88,7 @@ func buildTestPipeline( tracer, subscriptionManager, eventEmitter, + execTestUpdater, ) cancelRunHandlerFn := executor.HandleRunCancelation(execTestUpdater, tracer, eventEmitter) diff --git a/server/executor/trigger_executer_worker.go b/server/executor/trigger_executer_worker.go index 9bb0313bc8..73b1dc3fc1 100644 --- a/server/executor/trigger_executer_worker.go +++ b/server/executor/trigger_executer_worker.go @@ -8,7 +8,6 @@ import ( "os" "strings" - "github.com/kubeshop/tracetest/server/analytics" triggerer "github.com/kubeshop/tracetest/server/executor/trigger" "github.com/kubeshop/tracetest/server/model/events" "github.com/kubeshop/tracetest/server/test" @@ -80,30 +79,15 @@ func (r triggerExecuterWorker) ProcessItem(ctx context.Context, job Job) { } } - run = r.handleExecutionResult(run, response, err) run.SpanID = response.SpanID - + run.TriggerResult = response.Result + run = run.TriggerCompleted(run.TriggerResult) r.handleDBError(run, r.updater.Update(ctx, run)) job.Run = run r.outputQueue.Enqueue(ctx, job) } -func (r triggerExecuterWorker) handleExecutionResult(run test.Run, response triggerer.Response, err error) test.Run { - run = run.TriggerCompleted(response.Result) - if err != nil { - run = run.TriggerFailed(err) - - analytics.SendEvent("test_run_finished", "error", "", &map[string]string{ - "finalState": string(run.State), - }) - - return run - } - - return run.SuccessfullyTriggered() -} - func isConnectionError(err error) bool { for err != nil { // a dial error means we couldn't open a TCP connection (either host is not available or DNS doesn't exist) diff --git a/server/executor/trigger_resolver_worker.go b/server/executor/trigger_resolver_worker.go index e74629ee57..435048015d 100644 --- a/server/executor/trigger_resolver_worker.go +++ b/server/executor/trigger_resolver_worker.go @@ -107,7 +107,6 @@ func (r triggerResolverWorker) ProcessItem(ctx context.Context, job Job) { }} executor := expression.NewExecutor(ds...) - triggerOptions := &triggerer.ResolveOptions{ Executor: executor, } diff --git a/server/executor/trigger_result_processor_worker.go b/server/executor/trigger_result_processor_worker.go index 28882037ce..080995491a 100644 --- a/server/executor/trigger_result_processor_worker.go +++ b/server/executor/trigger_result_processor_worker.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/kubeshop/tracetest/server/analytics" "github.com/kubeshop/tracetest/server/model" "github.com/kubeshop/tracetest/server/model/events" "github.com/kubeshop/tracetest/server/subscription" @@ -21,11 +22,13 @@ func NewTriggerResultProcessorWorker( tracer trace.Tracer, subscriptionManager *subscription.Manager, eventEmitter EventEmitter, + updater RunUpdater, ) *triggerResultProcessorWorker { return &triggerResultProcessorWorker{ tracer: tracer, subscriptionManager: subscriptionManager, eventEmitter: eventEmitter, + updater: updater, } } @@ -34,6 +37,7 @@ type triggerResultProcessorWorker struct { subscriptionManager *subscription.Manager eventEmitter EventEmitter outputQueue Enqueuer + updater RunUpdater } func (r *triggerResultProcessorWorker) SetOutputQueue(queue Enqueuer) { @@ -56,6 +60,7 @@ func (r triggerResultProcessorWorker) ProcessItem(ctx context.Context, job Job) ctx, pollingSpan := r.tracer.Start(ctx, "Start processing trigger response") defer pollingSpan.End() + job.Run = r.handleExecutionResult(job.Run) triggerResult := job.Run.TriggerResult if triggerResult.Error != nil { err := triggerResult.Error.Error() @@ -85,6 +90,8 @@ func (r triggerResultProcessorWorker) ProcessItem(ctx context.Context, job Job) } } + r.handleDBError(job.Run, r.updater.Update(ctx, job.Run)) + r.outputQueue.Enqueue(ctx, job) } @@ -109,3 +116,17 @@ func (r triggerResultProcessorWorker) emitMismatchEndpointEvent(ctx context.Cont r.handleError(job.Run, emitErr) } } + +func (r triggerResultProcessorWorker) handleExecutionResult(run test.Run) test.Run { + if run.TriggerResult.Error != nil { + run = run.TriggerFailed(fmt.Errorf(run.TriggerResult.Error.ErrorMessage)) + + analytics.SendEvent("test_run_finished", "error", "", &map[string]string{ + "finalState": string(run.State), + }) + + return run + } + + return run.SuccessfullyTriggered() +} diff --git a/server/executor/workers/trigger_preparation_worker.go b/server/executor/workers/trigger_preparation_worker.go deleted file mode 100644 index be17f1e2f0..0000000000 --- a/server/executor/workers/trigger_preparation_worker.go +++ /dev/null @@ -1,24 +0,0 @@ -package workers - -import ( - "context" - - "github.com/kubeshop/tracetest/server/executor" - triggerer "github.com/kubeshop/tracetest/server/executor/trigger" - "github.com/kubeshop/tracetest/server/model" - "github.com/kubeshop/tracetest/server/test" -) - -type EventEmitter interface { - Emit(ctx context.Context, event model.TestRunEvent) error -} - -type TriggerPreparationWorker struct { - runRepository test.RunRepository - eventEmiter EventEmitter - triggers *triggerer.Registry -} - -func (w *TriggerPreparationWorker) ProcessItem(ctx context.Context, job executor.Job) { - -} diff --git a/server/test/trigger/trigger.go b/server/test/trigger/trigger.go index 025834cbea..758c84258a 100644 --- a/server/test/trigger/trigger.go +++ b/server/test/trigger/trigger.go @@ -15,7 +15,7 @@ type ( TriggerResult struct { Type TriggerType `json:"type"` - HTTP *HTTPResponse `json:"httpRequest,omitempty"` + HTTP *HTTPResponse `json:"http,omitempty"` GRPC *GRPCResponse `json:"grpc,omitempty"` TraceID *TraceIDResponse `json:"traceid,omitempty"` Kafka *KafkaResponse `json:"kafka,omitempty"`