From 6257f3b28a940c3502dd3450eac1a3101988302e Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 18 Dec 2024 09:52:16 +0200 Subject: [PATCH] Don't start components until Run is called --- .../otelcol/internal/scheduler/scheduler.go | 51 +++++++++++++++++-- .../component/otelcol/processor/processor.go | 6 +-- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 4649bda52..9eeec8da4 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,13 +37,33 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host + running bool + + // onPause is called when scheduler is making changes to running components. + onPause func() + // onResume is called when scheduler is done making changes to running components. + onResume func() } +// TODO: Delete this function? I don't think it's used anywhere. // New creates a new unstarted Scheduler. Call Run to start it, and call // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, + log: l, + onPause: func() {}, + onResume: func() {}, + } +} + +// TODO: Rename to "New"? +// TODO: Write a new comment to explain what this method does. +func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { + //TODO: Instead of assuming that the scheduler is paused, just call onPause() here. + return &Scheduler{ + log: l, + onPause: onPause, + onResume: onResume, } } @@ -57,15 +77,23 @@ func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...o cs.schedMut.Lock() defer cs.schedMut.Unlock() - // Stop the old components before running new scheduled ones. - cs.stopComponents(ctx, cs.schedComponents...) + cs.schedComponents = cc + cs.host = h + + if !cs.running { + return + } - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc)) - cs.schedComponents = cs.startComponents(ctx, h, cc...) + cs.runScheduled(ctx) } // Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { + cs.schedMut.Lock() + cs.running = true + cs.runScheduled(ctx) + cs.schedMut.Unlock() + // Make sure we terminate all of our running components on shutdown. defer func() { cs.schedMut.Lock() @@ -77,6 +105,19 @@ func (cs *Scheduler) Run(ctx context.Context) error { return nil } +func (cs *Scheduler) runScheduled(ctx context.Context) { + cs.onPause() + + // Stop the old components before running new scheduled ones. + cs.stopComponents(ctx, cs.schedComponents...) + + level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents)) + cs.schedComponents = cs.startComponents(ctx, cs.host, cs.schedComponents...) + //TODO: Check if there were errors? What if the trace component failed but the metrics one didn't? Should we resume all consumers? + + cs.onResume() +} + func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) { for _, c := range cc { if err := c.Shutdown(ctx); err != nil { diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 312369f07..089b4ca6d 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID), @@ -139,6 +139,8 @@ func (p *Processor) Run(ctx context.Context) error { // configuration for OpenTelemetry Collector processor configuration and manage // the underlying OpenTelemetry Collector processor. func (p *Processor) Update(args component.Arguments) error { + //TODO: Lock a mutex? There could be a race condition with multiple calls to Update + p.args = args.(Arguments) host := scheduler.NewHost( @@ -238,10 +240,8 @@ func (p *Processor) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - p.consumer.Pause() p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) p.sched.Schedule(p.ctx, host, components...) - p.consumer.Resume() return nil }