Skip to content

Commit

Permalink
Small improvements to PeriodicReader (#2421)
Browse files Browse the repository at this point in the history
Co-authored-by: Lalit Kumar Bhasin <[email protected]>
  • Loading branch information
cijothomas and lalitb authored Dec 13, 2024
1 parent ce550e3 commit 9b0ccce
Showing 1 changed file with 21 additions and 35 deletions.
56 changes: 21 additions & 35 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ where
/// return metric data to the user. It will not automatically send that data to
/// the exporter outside of the predefined interval.
///
/// As this spuns up own background thread, this is recommended to be used with push exporters
/// that do not require any particular async runtime. As of now, this cannot be used with
/// OTLP exporters as they requires async runtime
///
/// [collect]: MetricReader::collect
///
Expand Down Expand Up @@ -160,7 +157,7 @@ impl PeriodicReader {
mpsc::channel();
let reader = PeriodicReader {
inner: Arc::new(PeriodicReaderInner {
message_sender: Arc::new(Mutex::new(message_sender)),
message_sender: Arc::new(message_sender),
is_shutdown: AtomicBool::new(false),
producer: Mutex::new(None),
exporter: Arc::new(exporter),
Expand Down Expand Up @@ -223,6 +220,11 @@ impl PeriodicReader {
} else {
response_sender.send(true).unwrap();
}

otel_debug!(
name: "PeriodReaderThreadExiting",
reason = "ShutdownRequested"
);
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
Expand Down Expand Up @@ -255,8 +257,13 @@ impl PeriodicReader {
interval_start = Instant::now();
}
}
Err(_) => {
// Some other error. Break out and exit the thread.
Err(mpsc::RecvTimeoutError::Disconnected) => {
// Channel disconnected, only thing to do is break
// out (i.e exit the thread)
otel_debug!(
name: "PeriodReaderThreadExiting",
reason = "MessageReceiverDisconnected"
);
break;
}
}
Expand All @@ -271,6 +278,7 @@ impl PeriodicReader {
if let Err(e) = result_thread_creation {
otel_error!(
name: "PeriodReaderThreadStartError",
message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
error = format!("{:?}", e)
);
}
Expand All @@ -290,7 +298,7 @@ impl fmt::Debug for PeriodicReader {

struct PeriodicReaderInner {
exporter: Arc<dyn PushMetricExporter>,
message_sender: Arc<Mutex<mpsc::Sender<Message>>>,
message_sender: Arc<mpsc::Sender<Message>>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
is_shutdown: AtomicBool,
}
Expand Down Expand Up @@ -374,20 +382,9 @@ impl PeriodicReaderInner {
return Err(MetricError::Other("reader is shut down".into()));
}
let (response_tx, response_rx) = mpsc::channel();
match self.message_sender.lock() {
Ok(sender) => {
sender
.send(Message::Flush(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;
}
Err(e) => {
otel_debug!(
name: "PeriodReaderForceFlushError",
error = format!("{:?}", e)
);
return Err(MetricError::Other(e.to_string()));
}
}
self.message_sender
.send(Message::Flush(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;

if let Ok(response) = response_rx.recv() {
// TODO: call exporter's force_flush method.
Expand All @@ -408,20 +405,9 @@ impl PeriodicReaderInner {

// TODO: See if this is better to be created upfront.
let (response_tx, response_rx) = mpsc::channel();
match self.message_sender.lock() {
Ok(sender) => {
sender
.send(Message::Shutdown(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;
}
Err(e) => {
otel_debug!(
name: "PeriodReaderShutdownError",
error = format!("{:?}", e)
);
return Err(MetricError::Other(e.to_string()));
}
}
self.message_sender
.send(Message::Shutdown(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;

if let Ok(response) = response_rx.recv() {
self.is_shutdown
Expand Down

0 comments on commit 9b0ccce

Please sign in to comment.