Skip to content

Commit

Permalink
Make otel scheduler sync (#2262)
Browse files Browse the repository at this point in the history
* make otel scheduler sync

* Don't start components until Run is called (#2296)

* Don't start components until Run is called

* Update consumers after stopping the component

* Minor fixes

* Update internal/component/otelcol/internal/scheduler/scheduler.go

Co-authored-by: Piotr <[email protected]>

* test and docs

* add resume call on exist for robustness

---------

Co-authored-by: Paulin Todev <[email protected]>
Co-authored-by: Piotr <[email protected]>
  • Loading branch information
3 people authored Jan 2, 2025
1 parent 8a4af8b commit 8568931
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 87 deletions.
2 changes: 1 addition & 1 deletion internal/component/otelcol/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error {
})

// Schedule the components to run once our component is running.
a.sched.Schedule(host, components...)
a.sched.Schedule(a.ctx, func() {}, host, components...)
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,12 @@ func (p *Connector) Update(args component.Arguments) error {
return errors.New("unsupported connector type")
}

updateConsumersFunc := func() {
p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector)
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector)
p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...)
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions internal/component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,12 @@ func (e *Exporter) Update(args component.Arguments) error {
}
}

updateConsumersFunc := func() {
e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
}

// Schedule the components to run once our component is running.
e.sched.Schedule(host, components...)
e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
e.sched.Schedule(e.ctx, updateConsumersFunc, host, components...)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
e.sched.Schedule(host, components...)
e.sched.Schedule(e.ctx, func() {}, host, components...)
return nil
}

Expand Down
137 changes: 74 additions & 63 deletions internal/component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ type Scheduler struct {
schedMut sync.Mutex
schedComponents []otelcomponent.Component // Most recently created components
host otelcomponent.Host

// newComponentsCh is written to when schedComponents gets updated.
newComponentsCh chan struct{}
running bool

// onPause is called when scheduler is making changes to running components.
onPause func()
Expand All @@ -51,89 +49,102 @@ type Scheduler struct {
// Schedule to schedule components to run.
func New(l log.Logger) *Scheduler {
return &Scheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
onPause: func() {},
onResume: func() {},
log: l,
onPause: func() {},
onResume: func() {},
}
}

// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to
// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running
// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it
// will call onResume as a last step.
// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks.
// The callbacks are a useful way of pausing and resuming the ingestion of data by the components:
// * onPause() is called before the scheduler stops the components.
// * onResume() is called after the scheduler starts the components.
// The callbacks are used by the Schedule() and Run() functions.
// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran.
func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler {
return &Scheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
onPause: onPause,
onResume: onResume,
log: l,
onPause: onPause,
onResume: onResume,
}
}

// Schedule schedules a new set of OpenTelemetry Components to run. Components
// will only be scheduled when the Scheduler is running.
// Schedule a new set of OpenTelemetry Components to run.
// Components will only be started when the Scheduler's Run() function has been called.
//
// Schedule() completely overrides the set of previously running components.
// Components which have been removed since the last call to Schedule will be stopped.
//
// Schedule completely overrides the set of previously running components;
// components which have been removed since the last call to Schedule will be
// stopped.
func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) {
// updateConsumers is called after the components are paused and before starting the new components.
// It is expected that this function will set the new set of consumers to the wrapping consumer that's assigned to the Alloy component.
func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) {
cs.schedMut.Lock()
defer cs.schedMut.Unlock()

cs.schedComponents = cc
// If the scheduler isn't running yet, just update the state.
// That way the Run function is ready to go.
if !cs.running {
cs.schedComponents = cc
cs.host = h
updateConsumers()
return
}

// The new components must be setup in this order:
//
// 1. Pause consumers
// 2. Stop the old components
// 3. Change the consumers
// 4. Start the new components
// 5. Start the consumer
//
// There could be race conditions if the order above is not followed.

// 1. Pause consumers
// This prevents them from accepting new data while we're shutting them down.
cs.onPause()

// 2. Stop the old components
cs.stopComponents(ctx, cs.schedComponents...)

// 3. Change the consumers
// This can only be done after stopping the pervious components and before starting the new ones.
updateConsumers()

// 4. Start the new components
level.Debug(cs.log).Log("msg", "scheduling otelcol components", "count", len(cs.schedComponents))
cs.schedComponents = cs.startComponents(ctx, h, cc...)
cs.host = h
//TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers?

select {
case cs.newComponentsCh <- struct{}{}:
// Queued new message.
default:
// A message is already queued for refreshing running components so we
// don't have to do anything here.
}
// 5. Start the consumer
// The new components will now start accepting telemetry data.
cs.onResume()
}

// Run starts the Scheduler. Run will watch for schedule components to appear
// and run them, terminating previously running components if they exist.
// Run starts the Scheduler and stops the components when the context is cancelled.
func (cs *Scheduler) Run(ctx context.Context) error {
firstRun := true
var components []otelcomponent.Component
cs.schedMut.Lock()
cs.running = true

cs.onPause()
cs.startComponents(ctx, cs.host, cs.schedComponents...)
cs.onResume()

cs.schedMut.Unlock()

// Make sure we terminate all of our running components on shutdown.
defer func() {
if !firstRun { // always handle the callbacks correctly
cs.onPause()
}
cs.stopComponents(context.Background(), components...)
cs.schedMut.Lock()
defer cs.schedMut.Unlock()
cs.stopComponents(context.Background(), cs.schedComponents...)
// this Resume call should not be needed but is added for robustness to ensure that
// it does not ever exit in "paused" state.
cs.onResume()
}()

// Wait for a write to cs.newComponentsCh. The initial list of components is
// always empty so there's nothing to do until cs.newComponentsCh is written
// to.
for {
select {
case <-ctx.Done():
return nil
case <-cs.newComponentsCh:
if !firstRun {
cs.onPause() // do not pause on first run
} else {
firstRun = false
}
// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, components...)

cs.schedMut.Lock()
components = cs.schedComponents
host := cs.host
cs.schedMut.Unlock()

level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components))
components = cs.startComponents(ctx, host, components...)
cs.onResume()
}
}
<-ctx.Done()
return nil
}

func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) {
Expand Down
37 changes: 22 additions & 15 deletions internal/component/otelcol/internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"testing"
"time"

"go.uber.org/atomic"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/runtime/componenttest"
Expand All @@ -32,7 +33,7 @@ func TestScheduler(t *testing.T) {
// Schedule our component, which should notify the started trigger once it is
// running.
component, started, _ := newTriggerComponent()
cs.Schedule(h, component)
cs.Schedule(context.Background(), func() {}, h, component)
require.NoError(t, started.Wait(5*time.Second), "component did not start")
})

Expand All @@ -52,12 +53,12 @@ func TestScheduler(t *testing.T) {
// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)
cs.Schedule(context.Background(), func() {}, h, component)

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(h)
cs.Schedule(context.Background(), func() {}, h)
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
})

Expand All @@ -81,26 +82,32 @@ func TestScheduler(t *testing.T) {
require.NoError(t, err)
}()

toInt := func(a *atomic.Int32) int { return int(a.Load()) }

// The Run function starts the components. They should be paused and then resumed.
require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 1, toInt(pauseCalls), "pause callbacks should be called on run")
assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on run")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)

toInt := func(a *atomic.Int32) int { return int(a.Load()) }
cs.Schedule(ctx, func() {}, h, component)

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run")
assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run")
assert.Equal(t, 2, toInt(pauseCalls), "pause callbacks should be called on schedule")
assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on schedule")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(h)
cs.Schedule(ctx, func() {}, h)

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run")
assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run")
assert.Equal(t, 3, toInt(pauseCalls), "pause callback should be called on second schedule")
assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on second schedule")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
Expand All @@ -109,8 +116,8 @@ func TestScheduler(t *testing.T) {
cancel()

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown")
assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown")
assert.Equal(t, 3, toInt(pauseCalls), "pause callback should not be called on shutdown")
assert.Equal(t, 4, toInt(resumeCalls), "resume callback should be called on shutdown")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")
})

Expand All @@ -133,7 +140,7 @@ func TestScheduler(t *testing.T) {
// Schedule our component which will notify our trigger when Shutdown gets
// called.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)
cs.Schedule(ctx, func() {}, h, component)

// Wait for the component to start, and then stop our scheduler, which
// should cause our running component to terminate.
Expand Down
8 changes: 6 additions & 2 deletions internal/component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,13 @@ func (p *Processor) Update(args component.Arguments) error {
}
}

updateConsumersFunc := func() {
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
r.sched.Schedule(host, components...)
r.sched.Schedule(r.ctx, func() {}, host, components...)
return nil
}

Expand Down

0 comments on commit 8568931

Please sign in to comment.