diff --git a/server/app/app.go b/server/app/app.go index 952f74493f..b449505acf 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -245,6 +245,7 @@ func (app *App) Start(opts ...appOption) error { subscriptionManager, triggerRegistry, tracedbFactory, + app.cfg, ) testPipeline.Start() app.registerStopFn(func() { diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index 7195c27ad7..d777c9cdfd 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -2,6 +2,7 @@ package app import ( "github.com/jackc/pgx/v5/pgxpool" + "github.com/kubeshop/tracetest/server/config" "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/executor" "github.com/kubeshop/tracetest/server/executor/pollingprofile" @@ -30,6 +31,7 @@ func buildTestPipeline( subscriptionManager *subscription.Manager, triggerRegistry *trigger.Registry, tracedbFactory tracedb.FactoryFunc, + appConfig *config.AppConfig, ) *executor.TestPipeline { eventEmitter := executor.NewEventEmitter(treRepo, subscriptionManager) @@ -45,7 +47,7 @@ func buildTestPipeline( eventEmitter, ) - linterRunner := executor.NewlinterRunner( + linterRunner := executor.NewLinterRunner( execTestUpdater, subscriptionManager, eventEmitter, @@ -68,6 +70,7 @@ func buildTestPipeline( execTestUpdater, subscriptionManager, tracer, + appConfig.TestPipelineTraceFetchingEnabled(), ) tracePollerEvaluatorWorker := tracepollerworker.NewEvaluatorWorker( @@ -94,6 +97,7 @@ func buildTestPipeline( execTestUpdater, tracer, eventEmitter, + appConfig.TestPipelineTriggerExecutionEnabled(), ) triggerResultProcessorWorker := executor.NewTriggerResultProcessorWorker( diff --git a/server/config/server.go b/server/config/server.go index 2f29e999f6..a069c2145f 100644 --- a/server/config/server.go +++ b/server/config/server.go @@ -71,6 +71,16 @@ var serverOptions = options{ description: "internal telemetry otel collector (used for internal testing)", validate: nil, }, + { + key: "testPipelines.triggerExecute.enabled", + defaultValue: "true", + description: "enable local trigger execution", + }, + { + key: "testPipelines.traceFetch.enabled", + defaultValue: "true", + description: "enable local trace fetching", + }, } func init() { @@ -130,3 +140,20 @@ func (c *AppConfig) InternalTelemetryOtelCollectorAddress() string { return c.vp.GetString("internalTelemetry.otelCollectorEndpoint") } + +func (c *AppConfig) TestPipelineTriggerExecutionEnabled() bool { + c.mu.Lock() + defer c.mu.Unlock() + + // this config needs to be a string because pflags + // has a strage bug that ignores this field when + // it is set as false + return c.vp.GetString("testPipelines.triggerExecute.enabled") == "true" +} + +func (c *AppConfig) TestPipelineTraceFetchingEnabled() bool { + c.mu.Lock() + defer c.mu.Unlock() + + return c.vp.GetString("testPipelines.traceFetch.enabled") == "true" +} diff --git a/server/config/server_test.go b/server/config/server_test.go index 052a4ddfa5..f6362b2bbb 100644 --- a/server/config/server_test.go +++ b/server/config/server_test.go @@ -20,6 +20,9 @@ func TestServerConfig(t *testing.T) { assert.Equal(t, false, cfg.InternalTelemetryEnabled()) assert.Equal(t, "", cfg.InternalTelemetryOtelCollectorAddress()) + + assert.Equal(t, true, cfg.TestPipelineTriggerExecutionEnabled()) + assert.Equal(t, true, cfg.TestPipelineTraceFetchingEnabled()) }) t.Run("Flags", func(t *testing.T) { @@ -36,6 +39,8 @@ func TestServerConfig(t *testing.T) { "--experimentalFeatures", "b", "--internalTelemetry.enabled", "true", "--internalTelemetry.otelCollectorEndpoint", "otel-collector.tracetest", + "--testPipelines.triggerExecute.enabled", "false", + "--testPipelines.traceFetch.enabled", "false", } cfg := configWithFlags(t, flags) @@ -49,6 +54,9 @@ func TestServerConfig(t *testing.T) { assert.Equal(t, true, cfg.InternalTelemetryEnabled()) assert.Equal(t, "otel-collector.tracetest", cfg.InternalTelemetryOtelCollectorAddress()) + + assert.Equal(t, false, cfg.TestPipelineTriggerExecutionEnabled()) + assert.Equal(t, false, cfg.TestPipelineTraceFetchingEnabled()) }) t.Run("EnvVars", func(t *testing.T) { @@ -64,6 +72,8 @@ func TestServerConfig(t *testing.T) { "TRACETEST_EXPERIMENTALFEATURES": "a b", "TRACETEST_INTERNALTELEMETRY_ENABLED": "true", "TRACETEST_INTERNALTELEMETRY_OTELCOLLECTORENDPOINT": "otel-collector.tracetest", + "TRACETEST_TESTPIPELINES_TRIGGEREXECUTE_ENABLED": "false", + "TRACETEST_TESTPIPELINES_TRACEFETCH_ENABLED": "false", } cfg := configWithEnv(t, env) @@ -77,5 +87,8 @@ func TestServerConfig(t *testing.T) { assert.Equal(t, true, cfg.InternalTelemetryEnabled()) assert.Equal(t, "otel-collector.tracetest", cfg.InternalTelemetryOtelCollectorAddress()) + + assert.Equal(t, false, cfg.TestPipelineTriggerExecutionEnabled()) + assert.Equal(t, false, cfg.TestPipelineTraceFetchingEnabled()) }) } diff --git a/server/executor/linter_runner.go b/server/executor/linter_runner.go index b754ce2d13..77b9650ea1 100644 --- a/server/executor/linter_runner.go +++ b/server/executor/linter_runner.go @@ -26,7 +26,7 @@ type defaultLinterRunner struct { outputQueue pipeline.Enqueuer[Job] } -func NewlinterRunner( +func NewLinterRunner( updater RunUpdater, subscriptionManager *subscription.Manager, eventEmitter EventEmitter, diff --git a/server/executor/tracepollerworker/fetcher_worker.go b/server/executor/tracepollerworker/fetcher_worker.go index b0ae007190..1c18b42862 100644 --- a/server/executor/tracepollerworker/fetcher_worker.go +++ b/server/executor/tracepollerworker/fetcher_worker.go @@ -16,6 +16,7 @@ import ( type traceFetcherWorker struct { state *workerState outputQueue pipeline.Enqueuer[executor.Job] + enabled bool } func NewFetcherWorker( @@ -25,6 +26,7 @@ func NewFetcherWorker( updater executor.RunUpdater, subscriptionManager *subscription.Manager, tracer trace.Tracer, + enabled bool, ) *traceFetcherWorker { state := &workerState{ eventEmitter: eventEmitter, @@ -35,7 +37,7 @@ func NewFetcherWorker( tracer: tracer, } - return &traceFetcherWorker{state: state} + return &traceFetcherWorker{state: state, enabled: enabled} } func (w *traceFetcherWorker) SetInputQueue(queue pipeline.Enqueuer[executor.Job]) { @@ -47,6 +49,10 @@ func (w *traceFetcherWorker) SetOutputQueue(queue pipeline.Enqueuer[executor.Job } func (w *traceFetcherWorker) ProcessItem(ctx context.Context, job executor.Job) { + if !w.enabled { + return + } + ctx, span := w.state.tracer.Start(ctx, "Fetching trace") defer span.End() diff --git a/server/executor/trigger_executer_worker.go b/server/executor/trigger_executer_worker.go index 2552ee41a4..bcd9c35a00 100644 --- a/server/executor/trigger_executer_worker.go +++ b/server/executor/trigger_executer_worker.go @@ -21,12 +21,14 @@ func NewTriggerExecuterWorker( updater RunUpdater, tracer trace.Tracer, eventEmitter EventEmitter, + enabled bool, ) *triggerExecuterWorker { return &triggerExecuterWorker{ triggers: triggers, updater: updater, tracer: tracer, eventEmitter: eventEmitter, + enabled: enabled, } } @@ -36,6 +38,7 @@ type triggerExecuterWorker struct { tracer trace.Tracer eventEmitter EventEmitter outputQueue pipeline.Enqueuer[Job] + enabled bool } func (r *triggerExecuterWorker) SetOutputQueue(queue pipeline.Enqueuer[Job]) { @@ -55,6 +58,10 @@ func (r triggerExecuterWorker) handleError(run test.Run, err error) { } func (r triggerExecuterWorker) ProcessItem(ctx context.Context, job Job) { + if !r.enabled { + return + } + err := r.eventEmitter.Emit(ctx, events.TriggerExecutionStart(job.Run.TestID, job.Run.ID)) if err != nil { r.handleError(job.Run, err)