Skip to content

Commit

Permalink
Minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Dec 18, 2024
1 parent 3a0b4f6 commit 60aeb74
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 15 deletions.
2 changes: 1 addition & 1 deletion internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,
}
if err := p.Update(args); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,

supportedSignals: supportedSignals,
Expand Down
40 changes: 29 additions & 11 deletions internal/component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type Scheduler struct {
onResume func()
}

// TODO: Delete this function? I don't think it's used anywhere.
// New creates a new unstarted Scheduler. Call Run to start it, and call
// Schedule to schedule components to run.
func New(l log.Logger) *Scheduler {
Expand All @@ -56,23 +55,25 @@ func New(l log.Logger) *Scheduler {
}
}

// TODO: Rename to "New"?
// TODO: Write a new comment to explain what this method does.
// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks.
// The callbacks are a useful way of pausing and resuming the ingestion of data by the components:
// * onPause() is called before the scheduler stops the components.
// * onResume() is called after the scheduler starts the components.
// The callbacks are used by the Schedule() and Run() functions.
// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran.
func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler {
//TODO: Instead of assuming that the scheduler is paused, just call onPause() here.
return &Scheduler{
log: l,
onPause: onPause,
onResume: onResume,
}
}

// Schedule schedules a new set of OpenTelemetry Components to run. Components
// will only be scheduled when the Scheduler is running.
// Schedule a new set of OpenTelemetry Components to run.
// Components will only be started when the Scheduler's Run() function has been called.
//
// Schedule completely overrides the set of previously running components;
// components which have been removed since the last call to Schedule will be
// stopped.
// Schedule() completely overrides the set of previously running components.
// Components which have been removed since the last call to Schedule will be stopped.
func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) {
cs.schedMut.Lock()
defer cs.schedMut.Unlock()
Expand All @@ -86,18 +87,35 @@ func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h ote
return
}

// The new components must be setup in this order:
//
// 1. Pause consumers
// 2. Stop the old components
// 3. Change the consumers
// 4. Start the new components
// 5. Start the consumer
//
// There could be race conditions if the order above is not followed.

// 1. Pause consumers
// This prevents them from accepting new data while we're shutting them down.
cs.onPause()

// Stop the old components before running new scheduled ones.
// 2. Stop the old components
cs.stopComponents(ctx, cs.schedComponents...)

// 3. Change the consumers
// This is can only be done after stopping the pervious components and before starting the new ones.
updateConsumers()

// 4. Start the new components
level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents))
cs.schedComponents = cs.startComponents(ctx, h, cc...)
cs.host = h
//TODO: Check if there were errors? What if the trace component failed but the metrics one didn't? Should we resume all consumers?
//TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers?

// 5. Start the consumer
// The new components will now start accepting telemetry data.
cs.onResume()
}

Expand Down
2 changes: 0 additions & 2 deletions internal/component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ func (p *Processor) Run(ctx context.Context) error {
// configuration for OpenTelemetry Collector processor configuration and manage
// the underlying OpenTelemetry Collector processor.
func (p *Processor) Update(args component.Arguments) error {
//TODO: Lock a mutex? There could be a race condition with multiple calls to Update

p.args = args.(Arguments)

host := scheduler.NewHost(
Expand Down

0 comments on commit 60aeb74

Please sign in to comment.