diff --git a/internal/component/otelcol/auth/auth.go b/internal/component/otelcol/auth/auth.go index eb20434dec..e333e61122 100644 --- a/internal/component/otelcol/auth/auth.go +++ b/internal/component/otelcol/auth/auth.go @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error { }) // Schedule the components to run once our component is running. - a.sched.Schedule(host, components...) + a.sched.Schedule(a.ctx, host, components...) return nil } diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index bb9b3a151e..a0eda435f0 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn factory: f, consumer: consumer, - sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), + sched: scheduler.New(opts.Logger), collector: collector, } if err := p.Update(args); err != nil { @@ -215,8 +215,10 @@ func (p *Connector) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) + p.consumer.Pause() p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + p.sched.Schedule(p.ctx, host, components...) + p.consumer.Resume() return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 22a2b56d3a..b63d0c3487 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -131,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support factory: f, consumer: consumer, - sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), + sched: scheduler.New(opts.Logger), collector: collector, supportedSignals: supportedSignals, @@ -243,8 +243,10 @@ func (e *Exporter) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) + e.consumer.Pause() e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + e.sched.Schedule(e.ctx, host, components...) + e.consumer.Resume() return nil } diff --git a/internal/component/otelcol/extension/extension.go b/internal/component/otelcol/extension/extension.go index 1e494e71c1..457e0a9b17 100644 --- a/internal/component/otelcol/extension/extension.go +++ b/internal/component/otelcol/extension/extension.go @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) + e.sched.Schedule(e.ctx, host, components...) return nil } diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 2c731616a5..4649bda527 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,37 +37,13 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host - - // newComponentsCh is written to when schedComponents gets updated. - newComponentsCh chan struct{} - - // onPause is called when scheduler is making changes to running components. - onPause func() - // onResume is called when scheduler is done making changes to running components. - onResume func() } // New creates a new unstarted Scheduler. Call Run to start it, and call // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: func() {}, - onResume: func() {}, - } -} - -// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to -// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running -// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it -// will call onResume as a last step. -func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { - return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: onPause, - onResume: onResume, + log: l, } } @@ -77,63 +53,28 @@ func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Sched // Schedule completely overrides the set of previously running components; // components which have been removed since the last call to Schedule will be // stopped. -func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) { +func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() - cs.schedComponents = cc - cs.host = h + // Stop the old components before running new scheduled ones. + cs.stopComponents(ctx, cs.schedComponents...) - select { - case cs.newComponentsCh <- struct{}{}: - // Queued new message. - default: - // A message is already queued for refreshing running components so we - // don't have to do anything here. - } + level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc)) + cs.schedComponents = cs.startComponents(ctx, h, cc...) } -// Run starts the Scheduler. Run will watch for schedule components to appear -// and run them, terminating previously running components if they exist. +// Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { - firstRun := true - var components []otelcomponent.Component - // Make sure we terminate all of our running components on shutdown. defer func() { - if !firstRun { // always handle the callbacks correctly - cs.onPause() - } - cs.stopComponents(context.Background(), components...) - cs.onResume() + cs.schedMut.Lock() + defer cs.schedMut.Unlock() + cs.stopComponents(context.Background(), cs.schedComponents...) }() - // Wait for a write to cs.newComponentsCh. The initial list of components is - // always empty so there's nothing to do until cs.newComponentsCh is written - // to. - for { - select { - case <-ctx.Done(): - return nil - case <-cs.newComponentsCh: - if !firstRun { - cs.onPause() // do not pause on first run - } else { - firstRun = false - } - // Stop the old components before running new scheduled ones. - cs.stopComponents(ctx, components...) - - cs.schedMut.Lock() - components = cs.schedComponents - host := cs.host - cs.schedMut.Unlock() - - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components)) - components = cs.startComponents(ctx, host, components...) - cs.onResume() - } - } + <-ctx.Done() + return nil } func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) { diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 469d679b7f..5a9a59bd78 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -5,10 +5,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" otelcomponent "go.opentelemetry.io/collector/component" - "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/runtime/componenttest" @@ -32,7 +30,7 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started trigger once it is // running. component, started, _ := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), h, component) require.NoError(t, started.Wait(5*time.Second), "component did not start") }) @@ -52,68 +50,15 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), h, component) // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(context.Background(), h) require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) - t.Run("Pause callbacks are called", func(t *testing.T) { - var ( - pauseCalls = &atomic.Int32{} - resumeCalls = &atomic.Int32{} - l = util.TestLogger(t) - cs = scheduler.NewWithPauseCallbacks( - l, - func() { pauseCalls.Inc() }, - func() { resumeCalls.Inc() }, - ) - h = scheduler.NewHost(l) - ) - ctx, cancel := context.WithCancel(context.Background()) - - // Run our scheduler in the background. - go func() { - err := cs.Run(ctx) - require.NoError(t, err) - }() - - // Schedule our component, which should notify the started and stopped - // trigger once it starts and stops respectively. - component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) - - toInt := func(a *atomic.Int32) int { return int(a.Load()) } - - require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run") - assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run") - }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") - - // Wait for the component to start, and then unschedule all components, which - // should cause our running component to terminate. - require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) - - require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run") - assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run") - }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") - - require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") - - // Stop the scheduler - cancel() - - require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown") - assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown") - }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") - }) - t.Run("Running components get stopped on shutdown", func(t *testing.T) { var ( l = util.TestLogger(t) @@ -133,7 +78,7 @@ func TestScheduler(t *testing.T) { // Schedule our component which will notify our trigger when Shutdown gets // called. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(ctx, h, component) // Wait for the component to start, and then stop our scheduler, which // should cause our running component to terminate. diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 5072d65233..312369f079 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc factory: f, consumer: consumer, - sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), + sched: scheduler.New(opts.Logger), collector: collector, liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID), @@ -238,8 +238,11 @@ func (p *Processor) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) + p.consumer.Pause() p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + p.sched.Schedule(p.ctx, host, components...) + p.consumer.Resume() + return nil } diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 80b82efb06..673552d271 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - r.sched.Schedule(host, components...) + r.sched.Schedule(r.ctx, host, components...) return nil }