Skip to content

Commit

Permalink
Add observable instruments to periodicreader tests (#2428)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Dec 17, 2024
1 parent 4e52554 commit 6a3b04d
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,4 +782,134 @@ mod tests {
"Metrics should be available in exporter."
);
}

async fn some_async_function() -> u64 {
// No dependency on any particular async runtime.
std::thread::sleep(std::time::Duration::from_millis(1));
1
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() {
async_inside_observable_callback_helper();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() {
async_inside_observable_callback_helper();
}

#[tokio::test(flavor = "current_thread")]
async fn async_inside_observable_callback_from_tokio_current_thread() {
async_inside_observable_callback_helper();
}

#[test]
fn async_inside_observable_callback_from_regular_main() {
async_inside_observable_callback_helper();
}

fn async_inside_observable_callback_helper() {
let interval = std::time::Duration::from_millis(10);
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(interval)
.build();

let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _gauge = meter
.u64_observable_gauge("my_observable_gauge")
.with_callback(|observer| {
// using futures_executor::block_on intentionally and avoiding
// any particular async runtime.
let value = futures_executor::block_on(some_async_function());
observer.observe(value, &[]);
})
.build();

meter_provider.force_flush().expect("flush should succeed");
let exported_metrics = exporter
.get_finished_metrics()
.expect("this should not fail");
assert!(
!exported_metrics.is_empty(),
"Metrics should be available in exporter."
);
}

async fn some_tokio_async_function() -> u64 {
// Tokio specific async function
tokio::time::sleep(Duration::from_millis(1)).await;
1
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() {
tokio_async_inside_observable_callback_helper(true);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() {
tokio_async_inside_observable_callback_helper(true);
}

#[tokio::test(flavor = "current_thread")]
#[ignore] //TODO: Investigate if this can be fixed.
async fn tokio_async_inside_observable_callback_from_tokio_current_thread() {
tokio_async_inside_observable_callback_helper(true);
}

#[test]
fn tokio_async_inside_observable_callback_from_regular_main() {
tokio_async_inside_observable_callback_helper(false);
}

fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) {
let interval = std::time::Duration::from_millis(10);
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(interval)
.build();

let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");

if use_current_tokio_runtime {
let rt = tokio::runtime::Handle::current().clone();
let _gauge = meter
.u64_observable_gauge("my_observable_gauge")
.with_callback(move |observer| {
// call tokio specific async function from here
let value = rt.block_on(some_tokio_async_function());
observer.observe(value, &[]);
})
.build();
// rt here is a reference to the current tokio runtime.
// Droppng it occurs when the tokio::main itself ends.
} else {
let rt = tokio::runtime::Runtime::new().unwrap();
let _gauge = meter
.u64_observable_gauge("my_observable_gauge")
.with_callback(move |observer| {
// call tokio specific async function from here
let value = rt.block_on(some_tokio_async_function());
observer.observe(value, &[]);
})
.build();
// rt is not dropped here as it is moved to the closure,
// and is dropped only when MeterProvider itself is dropped.
// This works when called from normal main.
};

meter_provider.force_flush().expect("flush should succeed");
let exported_metrics = exporter
.get_finished_metrics()
.expect("this should not fail");
assert!(
!exported_metrics.is_empty(),
"Metrics should be available in exporter."
);
}
}

0 comments on commit 6a3b04d

Please sign in to comment.