diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index bb9b3a151e..dad87402eb 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -214,9 +214,11 @@ func (p *Connector) Update(args component.Arguments) error { return errors.New("unsupported connector type") } - // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) + // Pause the consumer, because we need to scheduler to run the new components before they receive any traffic. + p.consumer.Pause() p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + // Schedule the components. The scheduler will resume the consumer once the components are running. + p.sched.Schedule(host, components...) return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 22a2b56d3a..f9cc62861c 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -242,9 +242,11 @@ func (e *Exporter) Update(args component.Arguments) error { } } - // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) + // Pause the consumer, because we need to scheduler to run the new components before they receive any traffic. + e.consumer.Pause() e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + // Schedule the components. The scheduler will resume the consumer once the components are running. + e.sched.Schedule(host, components...) return nil } diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 5072d65233..948e383abd 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -237,9 +237,11 @@ func (p *Processor) Update(args component.Arguments) error { } } - // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) + // Pause the consumer, because we need to scheduler to run the new components before they receive any traffic. + p.consumer.Pause() p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + // Schedule the components. The scheduler will resume the consumer once the components are running. + p.sched.Schedule(host, components...) return nil }