From 7bb25a45eca066639af0568c2b0f59fa198d9682 Mon Sep 17 00:00:00 2001 From: Daniel Dias Date: Fri, 15 Sep 2023 13:47:13 -0300 Subject: [PATCH 1/5] feat(server): add options for test pipeline --- server/app/test_pipeline.go | 6 ++++- server/config/server.go | 24 +++++++++++++++++++ server/executor/linter_runner.go | 2 +- .../tracepollerworker/fetcher_worker.go | 9 ++++++- server/executor/trigger_executer_worker.go | 8 +++++++ 5 files changed, 46 insertions(+), 3 deletions(-) diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index 7195c27ad7..31cd4eb53e 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -2,6 +2,7 @@ package app import ( "github.com/jackc/pgx/v5/pgxpool" + "github.com/kubeshop/tracetest/server/config" "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/executor" "github.com/kubeshop/tracetest/server/executor/pollingprofile" @@ -30,6 +31,7 @@ func buildTestPipeline( subscriptionManager *subscription.Manager, triggerRegistry *trigger.Registry, tracedbFactory tracedb.FactoryFunc, + appConfig config.AppConfig, ) *executor.TestPipeline { eventEmitter := executor.NewEventEmitter(treRepo, subscriptionManager) @@ -45,7 +47,7 @@ func buildTestPipeline( eventEmitter, ) - linterRunner := executor.NewlinterRunner( + linterRunner := executor.NewLinterRunner( execTestUpdater, subscriptionManager, eventEmitter, @@ -68,6 +70,7 @@ func buildTestPipeline( execTestUpdater, subscriptionManager, tracer, + appConfig.TestPipelineTraceFetchingEnabled(), ) tracePollerEvaluatorWorker := tracepollerworker.NewEvaluatorWorker( @@ -94,6 +97,7 @@ func buildTestPipeline( execTestUpdater, tracer, eventEmitter, + appConfig.TestPipelineTriggerExecutionEnabled(), ) triggerResultProcessorWorker := executor.NewTriggerResultProcessorWorker( diff --git a/server/config/server.go b/server/config/server.go index 2f29e999f6..f6e85232af 100644 --- a/server/config/server.go +++ b/server/config/server.go @@ -71,6 +71,16 @@ var serverOptions = options{ description: "internal telemetry otel collector (used for internal testing)", validate: nil, }, + { + key: "testPipelines.triggerExecute.enabled", + defaultValue: true, + description: "enable local trigger execution", + }, + { + key: "testPipelines.traceFetch.enabled", + defaultValue: true, + description: "enable local trace fetching", + }, } func init() { @@ -130,3 +140,17 @@ func (c *AppConfig) InternalTelemetryOtelCollectorAddress() string { return c.vp.GetString("internalTelemetry.otelCollectorEndpoint") } + +func (c *AppConfig) TestPipelineTriggerExecutionEnabled() bool { + c.mu.Lock() + defer c.mu.Unlock() + + return c.vp.GetBool("testPipelines.triggerExecute.enabled") +} + +func (c *AppConfig) TestPipelineTraceFetchingEnabled() bool { + c.mu.Lock() + defer c.mu.Unlock() + + return c.vp.GetBool("testPipelines.traceFetch.enabled") +} diff --git a/server/executor/linter_runner.go b/server/executor/linter_runner.go index b754ce2d13..77b9650ea1 100644 --- a/server/executor/linter_runner.go +++ b/server/executor/linter_runner.go @@ -26,7 +26,7 @@ type defaultLinterRunner struct { outputQueue pipeline.Enqueuer[Job] } -func NewlinterRunner( +func NewLinterRunner( updater RunUpdater, subscriptionManager *subscription.Manager, eventEmitter EventEmitter, diff --git a/server/executor/tracepollerworker/fetcher_worker.go b/server/executor/tracepollerworker/fetcher_worker.go index b0ae007190..9456e046a0 100644 --- a/server/executor/tracepollerworker/fetcher_worker.go +++ b/server/executor/tracepollerworker/fetcher_worker.go @@ -16,6 +16,7 @@ import ( type traceFetcherWorker struct { state *workerState outputQueue pipeline.Enqueuer[executor.Job] + enabled bool } func NewFetcherWorker( @@ -25,6 +26,7 @@ func NewFetcherWorker( updater executor.RunUpdater, subscriptionManager *subscription.Manager, tracer trace.Tracer, + enabled bool, ) *traceFetcherWorker { state := &workerState{ eventEmitter: eventEmitter, @@ -35,7 +37,7 @@ func NewFetcherWorker( tracer: tracer, } - return &traceFetcherWorker{state: state} + return &traceFetcherWorker{state: state, enabled: enabled} } func (w *traceFetcherWorker) SetInputQueue(queue pipeline.Enqueuer[executor.Job]) { @@ -47,6 +49,11 @@ func (w *traceFetcherWorker) SetOutputQueue(queue pipeline.Enqueuer[executor.Job } func (w *traceFetcherWorker) ProcessItem(ctx context.Context, job executor.Job) { + if !w.enabled { + w.outputQueue.Enqueue(ctx, job) + return + } + ctx, span := w.state.tracer.Start(ctx, "Fetching trace") defer span.End() diff --git a/server/executor/trigger_executer_worker.go b/server/executor/trigger_executer_worker.go index 2552ee41a4..3f1e065c50 100644 --- a/server/executor/trigger_executer_worker.go +++ b/server/executor/trigger_executer_worker.go @@ -21,12 +21,14 @@ func NewTriggerExecuterWorker( updater RunUpdater, tracer trace.Tracer, eventEmitter EventEmitter, + enabled bool, ) *triggerExecuterWorker { return &triggerExecuterWorker{ triggers: triggers, updater: updater, tracer: tracer, eventEmitter: eventEmitter, + enabled: enabled, } } @@ -36,6 +38,7 @@ type triggerExecuterWorker struct { tracer trace.Tracer eventEmitter EventEmitter outputQueue pipeline.Enqueuer[Job] + enabled bool } func (r *triggerExecuterWorker) SetOutputQueue(queue pipeline.Enqueuer[Job]) { @@ -55,6 +58,11 @@ func (r triggerExecuterWorker) handleError(run test.Run, err error) { } func (r triggerExecuterWorker) ProcessItem(ctx context.Context, job Job) { + if !r.enabled { + r.outputQueue.Enqueue(ctx, job) + return + } + err := r.eventEmitter.Emit(ctx, events.TriggerExecutionStart(job.Run.TestID, job.Run.ID)) if err != nil { r.handleError(job.Run, err) From 1333e1b2d18f310c9fdc34a555bd70fe8248e138 Mon Sep 17 00:00:00 2001 From: Daniel Dias Date: Fri, 15 Sep 2023 13:50:17 -0300 Subject: [PATCH 2/5] fix --- server/app/app.go | 1 + server/app/test_pipeline.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/app/app.go b/server/app/app.go index 952f74493f..b449505acf 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -245,6 +245,7 @@ func (app *App) Start(opts ...appOption) error { subscriptionManager, triggerRegistry, tracedbFactory, + app.cfg, ) testPipeline.Start() app.registerStopFn(func() { diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index 31cd4eb53e..d777c9cdfd 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -31,7 +31,7 @@ func buildTestPipeline( subscriptionManager *subscription.Manager, triggerRegistry *trigger.Registry, tracedbFactory tracedb.FactoryFunc, - appConfig config.AppConfig, + appConfig *config.AppConfig, ) *executor.TestPipeline { eventEmitter := executor.NewEventEmitter(treRepo, subscriptionManager) From ba35d90cda0c028b2d3737df565d16de67879d3e Mon Sep 17 00:00:00 2001 From: Daniel Dias Date: Fri, 15 Sep 2023 13:53:40 -0300 Subject: [PATCH 3/5] adding tests --- server/config/server_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/config/server_test.go b/server/config/server_test.go index 052a4ddfa5..73cea02de3 100644 --- a/server/config/server_test.go +++ b/server/config/server_test.go @@ -36,6 +36,8 @@ func TestServerConfig(t *testing.T) { "--experimentalFeatures", "b", "--internalTelemetry.enabled", "true", "--internalTelemetry.otelCollectorEndpoint", "otel-collector.tracetest", + "--testPipelines.triggerExecute.enabled", "false", + "--testPipelines.traceFetch.enabled", "false", } cfg := configWithFlags(t, flags) @@ -49,6 +51,9 @@ func TestServerConfig(t *testing.T) { assert.Equal(t, true, cfg.InternalTelemetryEnabled()) assert.Equal(t, "otel-collector.tracetest", cfg.InternalTelemetryOtelCollectorAddress()) + + assert.Equal(t, false, cfg.TestPipelineTriggerExecutionEnabled()) + assert.Equal(t, false, cfg.TestPipelineTraceFetchingEnabled()) }) t.Run("EnvVars", func(t *testing.T) { @@ -64,6 +69,8 @@ func TestServerConfig(t *testing.T) { "TRACETEST_EXPERIMENTALFEATURES": "a b", "TRACETEST_INTERNALTELEMETRY_ENABLED": "true", "TRACETEST_INTERNALTELEMETRY_OTELCOLLECTORENDPOINT": "otel-collector.tracetest", + "TRACETEST_TESTPIPELINES_TRIGGEREXECUTE_ENABLED": "false", + "TRACETEST_TESTPIPELINES_TRACEFETCH_ENABLED": "false", } cfg := configWithEnv(t, env) @@ -77,5 +84,8 @@ func TestServerConfig(t *testing.T) { assert.Equal(t, true, cfg.InternalTelemetryEnabled()) assert.Equal(t, "otel-collector.tracetest", cfg.InternalTelemetryOtelCollectorAddress()) + + assert.Equal(t, false, cfg.TestPipelineTriggerExecutionEnabled()) + assert.Equal(t, false, cfg.TestPipelineTraceFetchingEnabled()) }) } From 24adb7212c84ed4a378eb6945cc422c30ea288da Mon Sep 17 00:00:00 2001 From: Daniel Dias Date: Fri, 15 Sep 2023 15:14:19 -0300 Subject: [PATCH 4/5] Fixed flag config --- server/config/server.go | 11 +++++++---- server/config/server_test.go | 3 +++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/server/config/server.go b/server/config/server.go index f6e85232af..a069c2145f 100644 --- a/server/config/server.go +++ b/server/config/server.go @@ -73,12 +73,12 @@ var serverOptions = options{ }, { key: "testPipelines.triggerExecute.enabled", - defaultValue: true, + defaultValue: "true", description: "enable local trigger execution", }, { key: "testPipelines.traceFetch.enabled", - defaultValue: true, + defaultValue: "true", description: "enable local trace fetching", }, } @@ -145,12 +145,15 @@ func (c *AppConfig) TestPipelineTriggerExecutionEnabled() bool { c.mu.Lock() defer c.mu.Unlock() - return c.vp.GetBool("testPipelines.triggerExecute.enabled") + // this config needs to be a string because pflags + // has a strage bug that ignores this field when + // it is set as false + return c.vp.GetString("testPipelines.triggerExecute.enabled") == "true" } func (c *AppConfig) TestPipelineTraceFetchingEnabled() bool { c.mu.Lock() defer c.mu.Unlock() - return c.vp.GetBool("testPipelines.traceFetch.enabled") + return c.vp.GetString("testPipelines.traceFetch.enabled") == "true" } diff --git a/server/config/server_test.go b/server/config/server_test.go index 73cea02de3..f6362b2bbb 100644 --- a/server/config/server_test.go +++ b/server/config/server_test.go @@ -20,6 +20,9 @@ func TestServerConfig(t *testing.T) { assert.Equal(t, false, cfg.InternalTelemetryEnabled()) assert.Equal(t, "", cfg.InternalTelemetryOtelCollectorAddress()) + + assert.Equal(t, true, cfg.TestPipelineTriggerExecutionEnabled()) + assert.Equal(t, true, cfg.TestPipelineTraceFetchingEnabled()) }) t.Run("Flags", func(t *testing.T) { From 4f91221e6524ffe886ad5acbc4c9b12141ec2c80 Mon Sep 17 00:00:00 2001 From: Daniel Dias Date: Fri, 15 Sep 2023 15:21:13 -0300 Subject: [PATCH 5/5] Fixing worked enabled condition --- server/executor/tracepollerworker/fetcher_worker.go | 1 - server/executor/trigger_executer_worker.go | 1 - 2 files changed, 2 deletions(-) diff --git a/server/executor/tracepollerworker/fetcher_worker.go b/server/executor/tracepollerworker/fetcher_worker.go index 9456e046a0..1c18b42862 100644 --- a/server/executor/tracepollerworker/fetcher_worker.go +++ b/server/executor/tracepollerworker/fetcher_worker.go @@ -50,7 +50,6 @@ func (w *traceFetcherWorker) SetOutputQueue(queue pipeline.Enqueuer[executor.Job func (w *traceFetcherWorker) ProcessItem(ctx context.Context, job executor.Job) { if !w.enabled { - w.outputQueue.Enqueue(ctx, job) return } diff --git a/server/executor/trigger_executer_worker.go b/server/executor/trigger_executer_worker.go index 3f1e065c50..bcd9c35a00 100644 --- a/server/executor/trigger_executer_worker.go +++ b/server/executor/trigger_executer_worker.go @@ -59,7 +59,6 @@ func (r triggerExecuterWorker) handleError(run test.Run, err error) { func (r triggerExecuterWorker) ProcessItem(ctx context.Context, job Job) { if !r.enabled { - r.outputQueue.Enqueue(ctx, job) return }