Skip to content

Commit

Permalink
Merge branch 'benchmark-batch' of github.com:lalitb/opentelemetry-rus…
Browse files Browse the repository at this point in the history
…t into benchmark-batch
  • Loading branch information
lalitb committed Aug 14, 2024
2 parents f0eb120 + 6959111 commit 0f4a5db
Show file tree
Hide file tree
Showing 28 changed files with 1,132 additions and 958 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-appender-log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
if self.enabled(record.metadata()) {
let mut log_record = self.logger.create_log_record();
log_record.set_severity_number(severity_of_level(record.level()));
log_record.set_severity_text(record.level().as_str().into());
log_record.set_severity_text(record.level().as_str());
log_record.set_body(AnyValue::from(record.args().to_string()));
log_record.add_attributes(log_attributes(record.key_values()));
log_record.set_target(record.metadata().target().to_string());
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
log_record.set_target(meta.target().to_string());
log_record.set_event_name(meta.name());
log_record.set_severity_number(severity_of_level(meta.level()));
log_record.set_severity_text(meta.level().as_str().into());
log_record.set_severity_text(meta.level().as_str());
let mut visitor = EventVisitor::new(&mut log_record);
#[cfg(feature = "experimental_metadata_attributes")]
visitor.visit_experimental_metadata(meta);
Expand Down
3 changes: 3 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
ObservableCounter,UpDownCounter including
[#1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517).
[#2004](https://github.com/open-telemetry/opentelemetry-rust/pull/2004)
- Fixed a bug related to cumulative aggregation of `Gauge` measurements.
[#1975](https://github.com/open-telemetry/opentelemetry-rust/issues/1975).
[#2021](https://github.com/open-telemetry/opentelemetry-rust/pull/2021)

## v0.24.1

Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn logging_comparable_to_appender(c: &mut Criterion) {
log_record.set_target("my-target".to_string());
log_record.set_event_name("CheckoutFailed");
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text("WARN".into());
log_record.set_severity_text("WARN");
log_record.add_attribute("book_id", "12345");
log_record.add_attribute("book_title", "Rust Programming Adventures");
log_record.add_attribute("message", "Unable to process checkout.");
Expand Down Expand Up @@ -275,7 +275,7 @@ fn criterion_benchmark(c: &mut Criterion) {
log_record.set_timestamp(now);
log_record.set_observed_timestamp(now);
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text(Severity::Warn.name().into());
log_record.set_severity_text(Severity::Warn.name());
logger.emit(log_record);
});

Expand All @@ -285,7 +285,7 @@ fn criterion_benchmark(c: &mut Criterion) {
log_record.set_timestamp(now);
log_record.set_observed_timestamp(now);
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text(Severity::Warn.name().into());
log_record.set_severity_text(Severity::Warn.name());
log_record.add_attribute("name", "my-event-name");
log_record.add_attribute("event.id", 20);
log_record.add_attribute("user.name", "otel");
Expand All @@ -299,7 +299,7 @@ fn criterion_benchmark(c: &mut Criterion) {
log_record.set_timestamp(now);
log_record.set_observed_timestamp(now);
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text(Severity::Warn.name().into());
log_record.set_severity_text(Severity::Warn.name());
log_record.add_attribute("name", "my-event-name");
log_record.add_attribute("event.id", 20);
log_record.add_attribute("user.name", "otel");
Expand Down Expand Up @@ -338,7 +338,7 @@ fn criterion_benchmark(c: &mut Criterion) {
log_record.set_timestamp(now);
log_record.set_observed_timestamp(now);
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text(Severity::Warn.name().into());
log_record.set_severity_text(Severity::Warn.name());
log_record.add_attributes(attributes.clone());
logger.emit(log_record);
});
Expand Down
26 changes: 13 additions & 13 deletions opentelemetry-sdk/benches/metric_gauge.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
The benchmark results:
criterion = "0.5.1"
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
OS: Ubuntu 22.04.4 LTS (5.15.153.1-microsoft-standard-WSL2)
Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs,
RAM: 64.0 GB
| Test | Average time|
|--------------------------------|-------------|
| Gauge_Add_4 | 586 ns |
| Gauge_Add | 178.37 ns |
*/

use criterion::{criterion_group, criterion_main, Criterion};
Expand All @@ -26,6 +26,11 @@ thread_local! {
static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy());
}

static ATTRIBUTE_VALUES: [&str; 10] = [
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9",
"value10",
];

// Run this benchmark with:
// cargo bench --bench metric_gauge
fn create_gauge() -> Gauge<u64> {
Expand All @@ -42,13 +47,8 @@ fn criterion_benchmark(c: &mut Criterion) {
}

fn gauge_record(c: &mut Criterion) {
let attribute_values = [
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9",
"value10",
];

let gauge = create_gauge();
c.bench_function("Gauge_Add_4", |b| {
c.bench_function("Gauge_Add", |b| {
b.iter(|| {
// 4*4*10*10 = 1600 time series.
let rands = CURRENT_RNG.with(|rng| {
Expand All @@ -67,10 +67,10 @@ fn gauge_record(c: &mut Criterion) {
gauge.record(
1,
&[
KeyValue::new("attribute1", attribute_values[index_first_attribute]),
KeyValue::new("attribute2", attribute_values[index_second_attribute]),
KeyValue::new("attribute3", attribute_values[index_third_attribute]),
KeyValue::new("attribute4", attribute_values[index_fourth_attribute]),
KeyValue::new("attribute1", ATTRIBUTE_VALUES[index_first_attribute]),
KeyValue::new("attribute2", ATTRIBUTE_VALUES[index_second_attribute]),
KeyValue::new("attribute3", ATTRIBUTE_VALUES[index_third_attribute]),
KeyValue::new("attribute4", ATTRIBUTE_VALUES[index_fourth_attribute]),
],
);
});
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mod tests {
let logger = logger_provider.logger("test-logger");
let mut log_record = logger.create_log_record();
log_record.set_severity_number(Severity::Error);
log_record.set_severity_text("Error".into());
log_record.set_severity_text("Error");

// Adding attributes using a vector with explicitly constructed Key and AnyValue objects.
log_record.add_attributes(vec![
Expand Down
31 changes: 14 additions & 17 deletions opentelemetry-sdk/src/logs/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{borrow::Cow, time::SystemTime};
/// is provided to `LogExporter`s as input.
pub struct LogRecord {
/// Event name. Optional as not all the logging API support it.
pub event_name: Option<Cow<'static, str>>,
pub event_name: Option<&'static str>,

/// Target of the log record
pub target: Option<Cow<'static, str>>,
Expand All @@ -26,7 +26,8 @@ pub struct LogRecord {
pub trace_context: Option<TraceContext>,

/// The original severity string from the source
pub severity_text: Option<Cow<'static, str>>,
pub severity_text: Option<&'static str>,

/// The corresponding severity value, normalized
pub severity_number: Option<Severity>,

Expand All @@ -38,11 +39,8 @@ pub struct LogRecord {
}

impl opentelemetry::logs::LogRecord for LogRecord {
fn set_event_name<T>(&mut self, name: T)
where
T: Into<Cow<'static, str>>,
{
self.event_name = Some(name.into());
fn set_event_name(&mut self, name: &'static str) {
self.event_name = Some(name);
}

// Sets the `target` of a record
Expand All @@ -61,7 +59,7 @@ impl opentelemetry::logs::LogRecord for LogRecord {
self.observed_timestamp = Some(timestamp);
}

fn set_severity_text(&mut self, severity_text: Cow<'static, str>) {
fn set_severity_text(&mut self, severity_text: &'static str) {
self.severity_text = Some(severity_text);
}

Expand Down Expand Up @@ -154,7 +152,7 @@ mod tests {
fn test_set_eventname() {
let mut log_record = LogRecord::default();
log_record.set_event_name("test_event");
assert_eq!(log_record.event_name, Some(Cow::Borrowed("test_event")));
assert_eq!(log_record.event_name, Some("test_event"));
}

#[test]
Expand Down Expand Up @@ -183,9 +181,8 @@ mod tests {
#[test]
fn test_set_severity_text() {
let mut log_record = LogRecord::default();
let severity_text: Cow<'static, str> = "ERROR".into(); // Explicitly typed
log_record.set_severity_text(severity_text);
assert_eq!(log_record.severity_text, Some(Cow::Borrowed("ERROR")));
log_record.set_severity_text("ERROR");
assert_eq!(log_record.severity_text, Some("ERROR"));
}

#[test]
Expand Down Expand Up @@ -247,11 +244,11 @@ mod tests {
#[test]
fn compare_log_record() {
let log_record = LogRecord {
event_name: Some(Cow::Borrowed("test_event")),
event_name: Some("test_event"),
target: Some(Cow::Borrowed("foo::bar")),
timestamp: Some(SystemTime::now()),
observed_timestamp: Some(SystemTime::now()),
severity_text: Some(Cow::Borrowed("ERROR")),
severity_text: Some("ERROR"),
severity_number: Some(Severity::Error),
body: Some(AnyValue::String("Test body".into())),
attributes: Some(vec![(Key::new("key"), AnyValue::String("value".into()))]),
Expand All @@ -267,20 +264,20 @@ mod tests {
assert_eq!(log_record, log_record_cloned);

let mut log_record_different = log_record.clone();
log_record_different.event_name = Some(Cow::Borrowed("different_event"));
log_record_different.event_name = Some("different_event");

assert_ne!(log_record, log_record_different);
}

#[test]
fn compare_log_record_target_borrowed_eq_owned() {
let log_record_borrowed = LogRecord {
event_name: Some(Cow::Borrowed("test_event")),
event_name: Some("test_event"),
..Default::default()
};

let log_record_owned = LogRecord {
event_name: Some(Cow::Owned("test_event".to_string())),
event_name: Some("test_event"),
..Default::default()
};

Expand Down
88 changes: 4 additions & 84 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync::Arc};
use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, sync::Arc};

use opentelemetry::{
metrics::{
AsyncInstrument, MetricsError, Result, SyncCounter, SyncGauge, SyncHistogram,
SyncUpDownCounter,
},
metrics::{AsyncInstrument, SyncCounter, SyncGauge, SyncHistogram, SyncUpDownCounter},
Key, KeyValue,
};

Expand All @@ -13,8 +10,6 @@ use crate::{
metrics::{aggregation::Aggregation, internal::Measure},
};

pub(crate) const EMPTY_MEASURE_MSG: &str = "no aggregators for observable instrument";

/// The identifier of a group of instruments that all perform the same function.
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum InstrumentKind {
Expand Down Expand Up @@ -289,89 +284,14 @@ impl<T: Copy + 'static> SyncHistogram<T> for ResolvedMeasures<T> {
}
}

/// A comparable unique identifier of an observable.
#[derive(Clone, Debug)]
pub(crate) struct ObservableId<T> {
pub(crate) inner: IdInner,
_marker: marker::PhantomData<T>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub(crate) struct IdInner {
/// The human-readable identifier of the instrument.
pub(crate) name: Cow<'static, str>,
/// describes the purpose of the instrument.
pub(crate) description: Cow<'static, str>,
/// The functional group of the instrument.
kind: InstrumentKind,
/// The unit of measurement recorded by the instrument.
pub(crate) unit: Cow<'static, str>,
/// The instrumentation that created the instrument.
scope: Scope,
}

impl<T> Hash for ObservableId<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.inner.hash(state)
}
}

impl<T> PartialEq for ObservableId<T> {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner
}
}

impl<T> Eq for ObservableId<T> {}

#[derive(Clone)]
pub(crate) struct Observable<T> {
pub(crate) id: ObservableId<T>,
measures: Vec<Arc<dyn Measure<T>>>,
}

impl<T> Observable<T> {
pub(crate) fn new(
scope: Scope,
kind: InstrumentKind,
name: Cow<'static, str>,
description: Cow<'static, str>,
unit: Cow<'static, str>,
measures: Vec<Arc<dyn Measure<T>>>,
) -> Self {
Self {
id: ObservableId {
inner: IdInner {
name,
description,
kind,
unit,
scope,
},
_marker: marker::PhantomData,
},
measures,
}
}

/// Returns `Err` if the observable should not be registered, and `Ok` if it
/// should.
///
/// An error is returned if this observable is effectively a no-op because it does not have
/// any aggregators. Also, an error is returned if scope defines a Meter other
/// than the observable it was created by.
pub(crate) fn registerable(&self, scope: &Scope) -> Result<()> {
if self.measures.is_empty() {
return Err(MetricsError::Other(EMPTY_MEASURE_MSG.into()));
}
if &self.id.inner.scope != scope {
return Err(MetricsError::Other(format!(
"invalid registration: observable {} from Meter {:?}, registered with Meter {}",
self.id.inner.name, self.id.inner.scope, scope.name,
)));
}

Ok(())
pub(crate) fn new(measures: Vec<Arc<dyn Measure<T>>>) -> Self {
Self { measures }
}
}

Expand Down
12 changes: 7 additions & 5 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,10 @@ impl<T: Number<T>> AggregateBuilder<T> {
}

/// Builds a last-value aggregate function input and output.
///
/// [Builder::temporality] is ignored and delta is always used.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
let lv_filter = Arc::new(LastValue::new());
let lv_agg = Arc::clone(&lv_filter);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| lv_filter.measure(n, a)),
Expand All @@ -127,7 +124,12 @@ impl<T: Number<T>> AggregateBuilder<T> {
};
let g = g.unwrap_or_else(|| new_agg.as_mut().expect("present if g is none"));

lv_agg.compute_aggregation(&mut g.data_points);
match t {
Some(Temporality::Delta) => {
lv_agg.compute_aggregation_delta(&mut g.data_points)
}
_ => lv_agg.compute_aggregation_cumulative(&mut g.data_points),
}

(g.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
},
Expand Down
Loading

0 comments on commit 0f4a5db

Please sign in to comment.