Skip to content

Commit

Permalink
Don't start components until Run is called
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Dec 18, 2024
1 parent b7bf45b commit 6257f3b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 8 deletions.
51 changes: 46 additions & 5 deletions internal/component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,33 @@ type Scheduler struct {
schedMut sync.Mutex
schedComponents []otelcomponent.Component // Most recently created components
host otelcomponent.Host
running bool

// onPause is called when scheduler is making changes to running components.
onPause func()
// onResume is called when scheduler is done making changes to running components.
onResume func()
}

// TODO: Delete this function? I don't think it's used anywhere.
// New creates a new unstarted Scheduler. Call Run to start it, and call
// Schedule to schedule components to run.
func New(l log.Logger) *Scheduler {
return &Scheduler{
log: l,
log: l,
onPause: func() {},
onResume: func() {},
}
}

// TODO: Rename to "New"?
// TODO: Write a new comment to explain what this method does.
func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler {
//TODO: Instead of assuming that the scheduler is paused, just call onPause() here.
return &Scheduler{
log: l,
onPause: onPause,
onResume: onResume,
}
}

Expand All @@ -57,15 +77,23 @@ func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...o
cs.schedMut.Lock()
defer cs.schedMut.Unlock()

// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, cs.schedComponents...)
cs.schedComponents = cc
cs.host = h

if !cs.running {
return
}

level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc))
cs.schedComponents = cs.startComponents(ctx, h, cc...)
cs.runScheduled(ctx)
}

// Run starts the Scheduler and stops the components when the context is cancelled.
func (cs *Scheduler) Run(ctx context.Context) error {
cs.schedMut.Lock()
cs.running = true
cs.runScheduled(ctx)
cs.schedMut.Unlock()

// Make sure we terminate all of our running components on shutdown.
defer func() {
cs.schedMut.Lock()
Expand All @@ -77,6 +105,19 @@ func (cs *Scheduler) Run(ctx context.Context) error {
return nil
}

func (cs *Scheduler) runScheduled(ctx context.Context) {
cs.onPause()

// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, cs.schedComponents...)

level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents))
cs.schedComponents = cs.startComponents(ctx, cs.host, cs.schedComponents...)
//TODO: Check if there were errors? What if the trace component failed but the metrics one didn't? Should we resume all consumers?

cs.onResume()
}

func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) {
for _, c := range cc {
if err := c.Shutdown(ctx); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,

liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),
Expand All @@ -139,6 +139,8 @@ func (p *Processor) Run(ctx context.Context) error {
// configuration for OpenTelemetry Collector processor configuration and manage
// the underlying OpenTelemetry Collector processor.
func (p *Processor) Update(args component.Arguments) error {
//TODO: Lock a mutex? There could be a race condition with multiple calls to Update

p.args = args.(Arguments)

host := scheduler.NewHost(
Expand Down Expand Up @@ -238,10 +240,8 @@ func (p *Processor) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
p.consumer.Pause()
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
p.sched.Schedule(p.ctx, host, components...)
p.consumer.Resume()

return nil
}
Expand Down

0 comments on commit 6257f3b

Please sign in to comment.