Skip to content

Commit

Permalink
make otel scheduler sync
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Dec 11, 2024
1 parent b88ed5e commit b7bf45b
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 139 deletions.
2 changes: 1 addition & 1 deletion internal/component/otelcol/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error {
})

// Schedule the components to run once our component is running.
a.sched.Schedule(host, components...)
a.sched.Schedule(a.ctx, host, components...)
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn
factory: f,
consumer: consumer,

sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
sched: scheduler.New(opts.Logger),
collector: collector,
}
if err := p.Update(args); err != nil {
Expand Down Expand Up @@ -215,8 +215,10 @@ func (p *Connector) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
p.consumer.Pause()
p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector)
p.sched.Schedule(p.ctx, host, components...)
p.consumer.Resume()
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions internal/component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support
factory: f,
consumer: consumer,

sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
sched: scheduler.New(opts.Logger),
collector: collector,

supportedSignals: supportedSignals,
Expand Down Expand Up @@ -243,8 +243,10 @@ func (e *Exporter) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
e.sched.Schedule(host, components...)
e.consumer.Pause()
e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
e.sched.Schedule(e.ctx, host, components...)
e.consumer.Resume()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
e.sched.Schedule(host, components...)
e.sched.Schedule(e.ctx, host, components...)
return nil
}

Expand Down
83 changes: 12 additions & 71 deletions internal/component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,13 @@ type Scheduler struct {
schedMut sync.Mutex
schedComponents []otelcomponent.Component // Most recently created components
host otelcomponent.Host

// newComponentsCh is written to when schedComponents gets updated.
newComponentsCh chan struct{}

// onPause is called when scheduler is making changes to running components.
onPause func()
// onResume is called when scheduler is done making changes to running components.
onResume func()
}

// New creates a new unstarted Scheduler. Call Run to start it, and call
// Schedule to schedule components to run.
func New(l log.Logger) *Scheduler {
return &Scheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
onPause: func() {},
onResume: func() {},
}
}

// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to
// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running
// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it
// will call onResume as a last step.
func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler {
return &Scheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
onPause: onPause,
onResume: onResume,
log: l,
}
}

Expand All @@ -77,63 +53,28 @@ func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Sched
// Schedule completely overrides the set of previously running components;
// components which have been removed since the last call to Schedule will be
// stopped.
func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) {
func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) {
cs.schedMut.Lock()
defer cs.schedMut.Unlock()

cs.schedComponents = cc
cs.host = h
// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, cs.schedComponents...)

select {
case cs.newComponentsCh <- struct{}{}:
// Queued new message.
default:
// A message is already queued for refreshing running components so we
// don't have to do anything here.
}
level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc))
cs.schedComponents = cs.startComponents(ctx, h, cc...)
}

// Run starts the Scheduler. Run will watch for schedule components to appear
// and run them, terminating previously running components if they exist.
// Run starts the Scheduler and stops the components when the context is cancelled.
func (cs *Scheduler) Run(ctx context.Context) error {
firstRun := true
var components []otelcomponent.Component

// Make sure we terminate all of our running components on shutdown.
defer func() {
if !firstRun { // always handle the callbacks correctly
cs.onPause()
}
cs.stopComponents(context.Background(), components...)
cs.onResume()
cs.schedMut.Lock()
defer cs.schedMut.Unlock()
cs.stopComponents(context.Background(), cs.schedComponents...)
}()

// Wait for a write to cs.newComponentsCh. The initial list of components is
// always empty so there's nothing to do until cs.newComponentsCh is written
// to.
for {
select {
case <-ctx.Done():
return nil
case <-cs.newComponentsCh:
if !firstRun {
cs.onPause() // do not pause on first run
} else {
firstRun = false
}
// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, components...)

cs.schedMut.Lock()
components = cs.schedComponents
host := cs.host
cs.schedMut.Unlock()

level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components))
components = cs.startComponents(ctx, host, components...)
cs.onResume()
}
}
<-ctx.Done()
return nil
}

func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) {
Expand Down
63 changes: 4 additions & 59 deletions internal/component/otelcol/internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/runtime/componenttest"
Expand All @@ -32,7 +30,7 @@ func TestScheduler(t *testing.T) {
// Schedule our component, which should notify the started trigger once it is
// running.
component, started, _ := newTriggerComponent()
cs.Schedule(h, component)
cs.Schedule(context.Background(), h, component)
require.NoError(t, started.Wait(5*time.Second), "component did not start")
})

Expand All @@ -52,68 +50,15 @@ func TestScheduler(t *testing.T) {
// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)
cs.Schedule(context.Background(), h, component)

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(h)
cs.Schedule(context.Background(), h)
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
})

t.Run("Pause callbacks are called", func(t *testing.T) {
var (
pauseCalls = &atomic.Int32{}
resumeCalls = &atomic.Int32{}
l = util.TestLogger(t)
cs = scheduler.NewWithPauseCallbacks(
l,
func() { pauseCalls.Inc() },
func() { resumeCalls.Inc() },
)
h = scheduler.NewHost(l)
)
ctx, cancel := context.WithCancel(context.Background())

// Run our scheduler in the background.
go func() {
err := cs.Run(ctx)
require.NoError(t, err)
}()

// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)

toInt := func(a *atomic.Int32) int { return int(a.Load()) }

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run")
assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(h)

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run")
assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")

// Stop the scheduler
cancel()

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown")
assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")
})

t.Run("Running components get stopped on shutdown", func(t *testing.T) {
var (
l = util.TestLogger(t)
Expand All @@ -133,7 +78,7 @@ func TestScheduler(t *testing.T) {
// Schedule our component which will notify our trigger when Shutdown gets
// called.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)
cs.Schedule(ctx, h, component)

// Wait for the component to start, and then stop our scheduler, which
// should cause our running component to terminate.
Expand Down
7 changes: 5 additions & 2 deletions internal/component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc
factory: f,
consumer: consumer,

sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
sched: scheduler.New(opts.Logger),
collector: collector,

liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),
Expand Down Expand Up @@ -238,8 +238,11 @@ func (p *Processor) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
p.consumer.Pause()
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
p.sched.Schedule(p.ctx, host, components...)
p.consumer.Resume()

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
r.sched.Schedule(host, components...)
r.sched.Schedule(r.ctx, host, components...)
return nil
}

Expand Down

0 comments on commit b7bf45b

Please sign in to comment.