Skip to content

Commit

Permalink
Fix race condition in otelcol wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Dec 10, 2024
1 parent b88ed5e commit 6bdd276
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
6 changes: 4 additions & 2 deletions internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,11 @@ func (p *Connector) Update(args component.Arguments) error {
return errors.New("unsupported connector type")
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
// Pause the consumer, because we need to scheduler to run the new components before they receive any traffic.
p.consumer.Pause()
p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector)
// Schedule the components. The scheduler will resume the consumer once the components are running.
p.sched.Schedule(host, components...)
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions internal/component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,11 @@ func (e *Exporter) Update(args component.Arguments) error {
}
}

// Schedule the components to run once our component is running.
e.sched.Schedule(host, components...)
// Pause the consumer, because we need to scheduler to run the new components before they receive any traffic.
e.consumer.Pause()
e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
// Schedule the components. The scheduler will resume the consumer once the components are running.
e.sched.Schedule(host, components...)
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions internal/component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,11 @@ func (p *Processor) Update(args component.Arguments) error {
}
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
// Pause the consumer, because we need to scheduler to run the new components before they receive any traffic.
p.consumer.Pause()
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
// Schedule the components. The scheduler will resume the consumer once the components are running.
p.sched.Schedule(host, components...)
return nil
}

Expand Down

0 comments on commit 6bdd276

Please sign in to comment.