Skip to content

Commit

Permalink
feat: report metrics about the test pipeline queues (#3213)
Browse files Browse the repository at this point in the history
* feat(server): report metrics about the test pipeline queues

* fix: test_suitte_runner_test
  • Loading branch information
mathnogueira authored Oct 4, 2023
1 parent 3ae60b0 commit 784ef19
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 5 deletions.
2 changes: 2 additions & 0 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func (app *App) Start(opts ...appOption) error {
triggerRegistry,
tracedbFactory,
app.cfg,
meter,
)
testPipeline.Start()
app.registerStopFn(func() {
Expand All @@ -267,6 +268,7 @@ func (app *App) Start(opts ...appOption) error {
testSuiteRunRepository,
testPipeline,
subscriptionManager,
meter,
)

testSuitePipeline.Start()
Expand Down
5 changes: 4 additions & 1 deletion server/app/test_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/test"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -32,6 +33,7 @@ func buildTestPipeline(
triggerRegistry *trigger.Registry,
tracedbFactory tracedb.FactoryFunc,
appConfig *config.AppConfig,
meter metric.Meter,
) *executor.TestPipeline {
eventEmitter := executor.NewEventEmitter(treRepo, subscriptionManager)

Expand Down Expand Up @@ -116,7 +118,8 @@ func buildTestPipeline(
WithPollingProfileGetter(ppRepo).
WithTestGetter(testRepo).
WithRunGetter(runRepo).
WithInstanceID(instanceID)
WithInstanceID(instanceID).
WithMetricMeter(meter)

pgQueue := pipeline.NewPostgresQueueDriver[executor.Job](pool, pgChannelName)

Expand Down
5 changes: 4 additions & 1 deletion server/app/test_suite_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ import (
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/testsuite"
"go.opentelemetry.io/otel/metric"
)

func buildTestSuitePipeline(
tranRepo *testsuite.Repository,
runRepo *testsuite.RunRepository,
testRunner *executor.TestPipeline,
subscriptionManager *subscription.Manager,
meter metric.Meter,
) *executor.TestSuitesPipeline {
tranRunner := executor.NewTestSuiteRunner(testRunner, runRepo, subscriptionManager)
queueBuilder := executor.NewQueueConfigurer().
WithTestSuiteGetter(tranRepo).
WithTestSuiteRunGetter(runRepo)
WithTestSuiteRunGetter(runRepo).
WithMetricMeter(meter)

pipeline := pipeline.New(queueBuilder,
pipeline.Step[executor.Job]{Processor: tranRunner, Driver: pipeline.NewInMemoryQueueDriver[executor.Job]("testSuiteRunner")},
Expand Down
10 changes: 10 additions & 0 deletions server/executor/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/test"
"github.com/kubeshop/tracetest/server/testsuite"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -187,6 +188,8 @@ type queueConfigurer[T any] struct {
pollingProfiles pollingProfileGetter
dataStores dataStoreGetter

meter metric.Meter

instanceID string
}

Expand Down Expand Up @@ -239,6 +242,11 @@ func (qb *queueConfigurer[T]) WithTestSuiteRunGetter(suiteRuns testSuiteRunGette
return qb
}

func (qb *queueConfigurer[T]) WithMetricMeter(meter metric.Meter) *queueConfigurer[T] {
qb.meter = meter
return qb
}

func (qb *queueConfigurer[T]) Configure(queue *pipeline.Queue[Job]) {
q := &Queue{
cancelRunHandlerFn: qb.cancelRunHandlerFn,
Expand All @@ -259,6 +267,8 @@ func (qb *queueConfigurer[T]) Configure(queue *pipeline.Queue[Job]) {
queue.EnqueuePreprocessorFn = q.enqueuePreprocess
queue.ListenPreprocessorFn = q.listenPreprocess

queue.InitializeMetrics(qb.meter)

}

type Queue struct {
Expand Down
4 changes: 3 additions & 1 deletion server/executor/test_suite_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/kubeshop/tracetest/server/variableset"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
)

type fakeTestRunner struct {
Expand Down Expand Up @@ -163,7 +164,8 @@ func runTestSuiteRunnerTest(t *testing.T, withErrors bool, assert func(t *testin

queueBuilder := executor.NewQueueConfigurer().
WithTestSuiteGetter(transactionsRepo).
WithTestSuiteRunGetter(transactionRunRepo)
WithTestSuiteRunGetter(transactionRunRepo).
WithMetricMeter(noop.NewMeterProvider().Meter("noop"))

pipeline := pipeline.New(queueBuilder,
pipeline.Step[executor.Job]{Processor: runner, Driver: pipeline.NewInMemoryQueueDriver[executor.Job]("testSuiteRunner")},
Expand Down
4 changes: 4 additions & 0 deletions server/pkg/pipeline/postgres_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,7 @@ func (ch *channel[T]) Enqueue(item T) {

ch.log("notified postgres")
}

func (ch *channel[T]) Name() string {
return ch.name
}
28 changes: 26 additions & 2 deletions server/pkg/pipeline/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

"github.com/alitto/pond"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

type Enqueuer[T any] interface {
Expand All @@ -22,9 +24,16 @@ type QueueDriver[T any] interface {
SetListener(Listener[T])
}

type namedDriver interface {
Name() string
}

type Queue[T any] struct {
driver QueueDriver[T]
itemProcessor QueueItemProcessor[T]
name string
driver QueueDriver[T]
itemProcessor QueueItemProcessor[T]
enqueueCounter metric.Int64Counter
listenCounter metric.Int64Counter

EnqueuePreprocessorFn func(context.Context, T) T
ListenPreprocessorFn func(context.Context, T) (context.Context, T)
Expand All @@ -43,11 +52,20 @@ func NewQueue[T any](driver QueueDriver[T], itemProcessor QueueItemProcessor[T])
workerPool: pond.New(QueueWorkerCount, QueueWorkerBufferSize),
}

if namedDriver, ok := driver.(namedDriver); ok {
queue.name = namedDriver.Name()
}

queue.SetDriver(driver)

return queue
}

func (q *Queue[T]) InitializeMetrics(meter metric.Meter) {
q.enqueueCounter, _ = meter.Int64Counter("messaging.enqueue.count")
q.listenCounter, _ = meter.Int64Counter("messaging.listen.count")
}

func (q *Queue[T]) SetDriver(driver QueueDriver[T]) {
q.driver = driver
driver.SetListener(q)
Expand All @@ -67,6 +85,9 @@ func (q Queue[T]) Enqueue(ctx context.Context, item T) {
item = q.EnqueuePreprocessorFn(ctx, item)
}

q.enqueueCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("queue.name", q.name),
))
q.driver.Enqueue(item)
})
}
Expand All @@ -86,6 +107,9 @@ func (q Queue[T]) Listen(item T) {
}

q.workerPool.Submit(func() {
q.listenCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("queue.name", q.name),
))
q.itemProcessor.ProcessItem(ctx, item)
})
}
Expand Down

0 comments on commit 784ef19

Please sign in to comment.