diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 9a720497ca..304b6736dd 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -117,9 +117,6 @@ where /// return metric data to the user. It will not automatically send that data to /// the exporter outside of the predefined interval. /// -/// As this spuns up own background thread, this is recommended to be used with push exporters -/// that do not require any particular async runtime. As of now, this cannot be used with -/// OTLP exporters as they requires async runtime /// /// [collect]: MetricReader::collect /// @@ -160,7 +157,7 @@ impl PeriodicReader { mpsc::channel(); let reader = PeriodicReader { inner: Arc::new(PeriodicReaderInner { - message_sender: Arc::new(Mutex::new(message_sender)), + message_sender: Arc::new(message_sender), is_shutdown: AtomicBool::new(false), producer: Mutex::new(None), exporter: Arc::new(exporter), @@ -223,6 +220,11 @@ impl PeriodicReader { } else { response_sender.send(true).unwrap(); } + + otel_debug!( + name: "PeriodReaderThreadExiting", + reason = "ShutdownRequested" + ); break; } Err(mpsc::RecvTimeoutError::Timeout) => { @@ -255,8 +257,13 @@ impl PeriodicReader { interval_start = Instant::now(); } } - Err(_) => { - // Some other error. Break out and exit the thread. + Err(mpsc::RecvTimeoutError::Disconnected) => { + // Channel disconnected, only thing to do is break + // out (i.e exit the thread) + otel_debug!( + name: "PeriodReaderThreadExiting", + reason = "MessageReceiverDisconnected" + ); break; } } @@ -271,6 +278,7 @@ impl PeriodicReader { if let Err(e) = result_thread_creation { otel_error!( name: "PeriodReaderThreadStartError", + message = "Failed to start PeriodicReader thread. Metrics will not be exported.", error = format!("{:?}", e) ); } @@ -290,7 +298,7 @@ impl fmt::Debug for PeriodicReader { struct PeriodicReaderInner { exporter: Arc, - message_sender: Arc>>, + message_sender: Arc>, producer: Mutex>>, is_shutdown: AtomicBool, } @@ -374,20 +382,9 @@ impl PeriodicReaderInner { return Err(MetricError::Other("reader is shut down".into())); } let (response_tx, response_rx) = mpsc::channel(); - match self.message_sender.lock() { - Ok(sender) => { - sender - .send(Message::Flush(response_tx)) - .map_err(|e| MetricError::Other(e.to_string()))?; - } - Err(e) => { - otel_debug!( - name: "PeriodReaderForceFlushError", - error = format!("{:?}", e) - ); - return Err(MetricError::Other(e.to_string())); - } - } + self.message_sender + .send(Message::Flush(response_tx)) + .map_err(|e| MetricError::Other(e.to_string()))?; if let Ok(response) = response_rx.recv() { // TODO: call exporter's force_flush method. @@ -408,20 +405,9 @@ impl PeriodicReaderInner { // TODO: See if this is better to be created upfront. let (response_tx, response_rx) = mpsc::channel(); - match self.message_sender.lock() { - Ok(sender) => { - sender - .send(Message::Shutdown(response_tx)) - .map_err(|e| MetricError::Other(e.to_string()))?; - } - Err(e) => { - otel_debug!( - name: "PeriodReaderShutdownError", - error = format!("{:?}", e) - ); - return Err(MetricError::Other(e.to_string())); - } - } + self.message_sender + .send(Message::Shutdown(response_tx)) + .map_err(|e| MetricError::Other(e.to_string()))?; if let Ok(response) = response_rx.recv() { self.is_shutdown