diff --git a/.cspell.json b/.cspell.json index 23ecd03c8b..be8b4c0cda 100644 --- a/.cspell.json +++ b/.cspell.json @@ -49,7 +49,9 @@ "nocapture", "Ochtman", "opentelemetry", + "OTELCOL", "OTLP", + "periodicreader", "protoc", "quantile", "Redelmeier", @@ -59,6 +61,7 @@ "shoppingcart", "struct", "Tescher", + "testresults", "tracerprovider", "updown", "Zhongyang", diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2a060b7e60..34d979987d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -148,7 +148,7 @@ jobs: if: hashFiles('Cargo.lock') == '' run: cargo generate-lockfile - name: cargo llvm-cov - run: cargo llvm-cov --locked --all-features --workspace --lcov --output-path lcov.info + run: cargo llvm-cov --locked --all-features --workspace --lcov --lib --output-path lcov.info - name: Upload to codecov.io uses: codecov/codecov-action@v4 env: diff --git a/examples/logs-basic/Cargo.toml b/examples/logs-basic/Cargo.toml index 52b8182e31..00321af4fc 100644 --- a/examples/logs-basic/Cargo.toml +++ b/examples/logs-basic/Cargo.toml @@ -6,10 +6,8 @@ license = "Apache-2.0" publish = false [dependencies] -opentelemetry = { path = "../../opentelemetry", features = ["logs"] } opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["logs"] } opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["logs"]} opentelemetry-appender-tracing = { path = "../../opentelemetry-appender-tracing", default-features = false} -opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" } tracing = { workspace = true, features = ["std"]} tracing-subscriber = { workspace = true, features = ["registry", "std"] } diff --git a/examples/metrics-advanced/Cargo.toml b/examples/metrics-advanced/Cargo.toml index 611bec1352..31cdf73a89 100644 --- a/examples/metrics-advanced/Cargo.toml +++ b/examples/metrics-advanced/Cargo.toml @@ -10,4 +10,3 @@ opentelemetry = { path = "../../opentelemetry", features = ["metrics"] } opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["spec_unstable_metrics_views", "rt-tokio"] } opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"] } tokio = { workspace = true, features = ["full"] } -serde_json = { workspace = true } diff --git a/examples/metrics-basic/Cargo.toml b/examples/metrics-basic/Cargo.toml index b68573e60a..69a8fc8628 100644 --- a/examples/metrics-basic/Cargo.toml +++ b/examples/metrics-basic/Cargo.toml @@ -10,5 +10,4 @@ opentelemetry = { path = "../../opentelemetry", features = ["metrics"] } opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "rt-tokio"] } opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]} tokio = { workspace = true, features = ["full"] } -serde_json = { workspace = true } diff --git a/examples/tracing-grpc/Cargo.toml b/examples/tracing-grpc/Cargo.toml index c4ba3e1105..c836904a37 100644 --- a/examples/tracing-grpc/Cargo.toml +++ b/examples/tracing-grpc/Cargo.toml @@ -20,7 +20,6 @@ opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["trace prost = { workspace = true } tokio = { workspace = true, features = ["full"] } tonic = { workspace = true } -serde_json = { workspace = true } [build-dependencies] tonic-build = { workspace = true } diff --git a/examples/tracing-jaeger/Cargo.toml b/examples/tracing-jaeger/Cargo.toml index 809365c66b..6257f0ebf0 100644 --- a/examples/tracing-jaeger/Cargo.toml +++ b/examples/tracing-jaeger/Cargo.toml @@ -9,5 +9,4 @@ publish = false opentelemetry = { path = "../../opentelemetry" } opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["rt-tokio"] } opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["tonic"] } -opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" } tokio = { workspace = true, features = ["full"] } diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index 4549607738..0c0c099024 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -16,16 +16,9 @@ experimental_metrics_periodicreader_with_async_runtime = ["opentelemetry_sdk/exp once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry" } opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "experimental_metrics_periodicreader_with_async_runtime"]} -opentelemetry-http = { path = "../../../opentelemetry-http", optional = true, default-features = false} opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs"] , default-features = false} opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} -opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" } -async-trait = { workspace = true, optional = true } -bytes = { workspace = true, optional = true } -http = { workspace = true, optional = true } -http-body-util = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"]} -tracing-core = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } diff --git a/opentelemetry-otlp/examples/basic-otlp-http/README.md b/opentelemetry-otlp/examples/basic-otlp-http/README.md index 2299cf0afb..eb65c74160 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/README.md +++ b/opentelemetry-otlp/examples/basic-otlp-http/README.md @@ -16,7 +16,7 @@ recommended approach when using OTLP exporters. While it can be modified to use a `SimpleExporter`, this requires enabling feature flag `reqwest-blocking-client` and making the `main()` a normal main and *not* `tokio::main` -// TODO: Metrics does not work with non tokio main when using `reqwest-blocking-client` today, fix that when switching +// TODO: Metrics does not work with non tokio main when using `reqwest-blocking-client` today, fix that when switching // default to use own thread. // TODO: Document `hyper` feature flag when using SimpleProcessor. @@ -66,14 +66,12 @@ Run the app which exports logs, metrics and traces via OTLP to the collector cargo run ``` - By default the app will use a `reqwest` client to send. A hyper 0.14 client can be used with the `hyper` feature enabled ```shell cargo run --no-default-features --features=hyper ``` - ## View results You should be able to see something similar below with different time and ID in the same console that docker runs. @@ -135,7 +133,7 @@ SpanEvent #0 -> Timestamp: 2024-05-14 02:15:56.824201397 +0000 UTC -> DroppedAttributesCount: 0 -> Attributes:: - -> bogons: Int(100) + -> some.key: Int(100) {"kind": "exporter", "data_type": "traces", "name": "logging"} ... ``` diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 2b5d878f9d..6b3dee3f07 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -106,7 +106,7 @@ async fn main() -> Result<(), Box> { // Create a new tracing::Fmt layer to print the logs to stdout. It has a // default filter of `info` level and above, and `debug` and above for logs - // from OpenTelemtry crates. The filter levels can be customized as needed. + // from OpenTelemetry crates. The filter levels can be customized as needed. let filter_fmt = EnvFilter::new("info").add_directive("opentelemetry=debug".parse().unwrap()); let fmt_layer = tracing_subscriber::fmt::layer() .with_thread_names(true) @@ -148,7 +148,7 @@ async fn main() -> Result<(), Box> { let span = cx.span(); span.add_event( "Nice operation!".to_string(), - vec![KeyValue::new("bogons", 100)], + vec![KeyValue::new("some.key", 100)], ); span.set_attribute(KeyValue::new("another.key", "yes")); diff --git a/opentelemetry-otlp/examples/basic-otlp/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp/Cargo.toml index 36d093154f..ad050bc338 100644 --- a/opentelemetry-otlp/examples/basic-otlp/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp/Cargo.toml @@ -10,9 +10,7 @@ once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry" } opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio"] } opentelemetry-otlp = { path = "../../../opentelemetry-otlp" } -opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" } tokio = { version = "1.0", features = ["full"] } opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} tracing = { workspace = true, features = ["std"]} -tracing-core = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } \ No newline at end of file diff --git a/opentelemetry-otlp/examples/basic-otlp/README.md b/opentelemetry-otlp/examples/basic-otlp/README.md index 632e2155a6..ca02018ad5 100644 --- a/opentelemetry-otlp/examples/basic-otlp/README.md +++ b/opentelemetry-otlp/examples/basic-otlp/README.md @@ -15,7 +15,7 @@ recommended approach when using OTLP exporters. While it can be modified to use a `SimpleExporter`, this requires the main method to be a `tokio::main` function since the `tonic` client requires a Tokio runtime. If you prefer not to use `tokio::main`, then the `init_logs` and `init_traces` functions must be executed -within a Tokio runtime. +within a Tokio runtime. This examples uses the default `PeriodicReader` for metrics, which uses own thread for background processing/exporting. Since the `tonic` client requires a @@ -154,7 +154,7 @@ SpanEvent #0 -> Timestamp: 2024-05-22 20:25:42.8770471 +0000 UTC -> DroppedAttributesCount: 0 -> Attributes:: - -> bogons: Int(100) + -> some.key: Int(100) {"kind": "exporter", "data_type": "traces", "name": "logging"} ``` diff --git a/opentelemetry-otlp/examples/basic-otlp/src/main.rs b/opentelemetry-otlp/examples/basic-otlp/src/main.rs index 10a60d3515..c5425f8a9b 100644 --- a/opentelemetry-otlp/examples/basic-otlp/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp/src/main.rs @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box> { // Create a new tracing::Fmt layer to print the logs to stdout. It has a // default filter of `info` level and above, and `debug` and above for logs - // from OpenTelemtry crates. The filter levels can be customized as needed. + // from OpenTelemetry crates. The filter levels can be customized as needed. let filter_fmt = EnvFilter::new("info").add_directive("opentelemetry=debug".parse().unwrap()); let fmt_layer = tracing_subscriber::fmt::layer() .with_thread_names(true) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index da6c280a61..e718960686 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -66,8 +66,11 @@ pub struct HttpConfig { impl Default for HttpConfig { fn default() -> Self { #[cfg(feature = "reqwest-blocking-client")] - let default_client = - Some(Arc::new(reqwest::blocking::Client::new()) as Arc); + let default_client = std::thread::spawn(|| { + Some(Arc::new(reqwest::blocking::Client::new()) as Arc) + }) + .join() + .expect("creating reqwest::blocking::Client on a new thread not to fail"); #[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))] let default_client = Some(Arc::new(reqwest::Client::new()) as Arc); #[cfg(all( diff --git a/opentelemetry-otlp/tests/integration_test/.gitignore b/opentelemetry-otlp/tests/integration_test/.gitignore new file mode 100644 index 0000000000..059fd6dce2 --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/.gitignore @@ -0,0 +1,2 @@ +lcov.info +actual/*.json diff --git a/opentelemetry-otlp/tests/integration_test/Cargo.toml b/opentelemetry-otlp/tests/integration_test/Cargo.toml index 413673286e..60b4869f1e 100644 --- a/opentelemetry-otlp/tests/integration_test/Cargo.toml +++ b/opentelemetry-otlp/tests/integration_test/Cargo.toml @@ -4,15 +4,19 @@ version = "0.1.0" edition = "2021" publish = false - [dependencies] -opentelemetry = { path = "../../../opentelemetry", features = ["metrics", "logs"] } -opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "logs", "testing"] } -opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["gen-tonic-messages", "trace", "logs", "with-serde"] } +opentelemetry = { path = "../../../opentelemetry", features = [] } +opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "testing"] } +opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["gen-tonic-messages", "trace", "logs", "metrics", "with-serde"] } log = { workspace = true } tokio = { version = "1.0", features = ["full"] } serde_json = "1" -testcontainers = "0.15.0" +testcontainers = { version = "0.23.1", features = ["http_wait"]} +once_cell.workspace = true +anyhow = "1.0.94" +ctor = "0.2.9" +tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } +tracing = "0.1.41" [target.'cfg(unix)'.dependencies] opentelemetry-appender-log = { path = "../../../opentelemetry-appender-log", default-features = false} @@ -20,11 +24,12 @@ opentelemetry-otlp = { path = "../../../opentelemetry-otlp", default-features = opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" } [features] -hyper-client = ["opentelemetry-otlp/hyper-client", "opentelemetry-otlp/http-proto", "opentelemetry-otlp/trace", "opentelemetry-otlp/logs", "opentelemetry-otlp/metrics"] -reqwest-client = ["opentelemetry-otlp/reqwest-client", "opentelemetry-otlp/http-proto", "opentelemetry-otlp/trace","opentelemetry-otlp/logs", "opentelemetry-otlp/metrics"] -reqwest-blocking-client = ["opentelemetry-otlp/reqwest-blocking-client", "opentelemetry-otlp/http-proto", "opentelemetry-otlp/trace","opentelemetry-otlp/logs", "opentelemetry-otlp/metrics"] -tonic-client = ["opentelemetry-otlp/grpc-tonic", "opentelemetry-otlp/trace", "opentelemetry-otlp/logs", "opentelemetry-otlp/metrics"] +hyper-client = ["opentelemetry-otlp/hyper-client", "opentelemetry-otlp/http-proto", "opentelemetry-otlp/trace", "opentelemetry-otlp/logs", "opentelemetry-otlp/metrics", "internal-logs"] +reqwest-client = ["opentelemetry-otlp/reqwest-client", "opentelemetry-otlp/http-proto", "opentelemetry-otlp/trace","opentelemetry-otlp/logs", "opentelemetry-otlp/metrics", "internal-logs"] +reqwest-blocking-client = ["opentelemetry-otlp/reqwest-blocking-client", "opentelemetry-otlp/http-proto", "opentelemetry-otlp/trace","opentelemetry-otlp/logs", "opentelemetry-otlp/metrics", "internal-logs"] +tonic-client = ["opentelemetry-otlp/grpc-tonic", "opentelemetry-otlp/trace", "opentelemetry-otlp/logs", "opentelemetry-otlp/metrics", "internal-logs"] +internal-logs = ["opentelemetry-otlp/internal-logs"] # Keep tonic as the default client -default = ["tonic-client"] +default = ["tonic-client", "internal-logs"] diff --git a/opentelemetry-otlp/tests/integration_test/README.md b/opentelemetry-otlp/tests/integration_test/README.md new file mode 100644 index 0000000000..10e34df6a3 --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/README.md @@ -0,0 +1,17 @@ +# OTLP - Integration Tests + +This directory contains integration tests for `opentelemetry-otlp`. It uses +[testcontainers](https://testcontainers.com/) to start an instance of the OTEL +collector using [otel-collector-config.yaml](otel-collector-config.yaml), which +then uses a file exporter per signal to write the output it receives back to the +host machine. + +The tests connect directly to the collector on `localhost:4317` and +`localhost:4318`, push data through, and then check that what they expect has +popped back out into the files output by the collector. + +## Pre-requisites + +* Docker, for the test container +* TCP/4317 and TCP/4318 free on your local machine. If you are running another + collector, you'll need to stop it for the tests to run. diff --git a/opentelemetry-otlp/tests/integration_test/actual/README.md b/opentelemetry-otlp/tests/integration_test/actual/README.md new file mode 100644 index 0000000000..9380bd7807 --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/actual/README.md @@ -0,0 +1 @@ +Output from the otel-collector goes here. diff --git a/opentelemetry-otlp/tests/integration_test/expected/different_metrics.json b/opentelemetry-otlp/tests/integration_test/expected/different_metrics.json new file mode 100644 index 0000000000..5b9bcdba0a --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/expected/different_metrics.json @@ -0,0 +1,133 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "metrics-integration-test" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": { + "name": "meter" + }, + "metrics": [ + { + "name": "counter_u64", + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "mykey1", + "value": { + "stringValue": "mydifferentval" + } + }, + { + "key": "mykey2", + "value": { + "stringValue": "myvalue2" + } + } + ], + "startTimeUnixNano": "1734094309366798000", + "timeUnixNano": "1734094317871514000", + "asInt": "15" + } + ], + "aggregationTemporality": 2, + "isMonotonic": true + } + }, + { + "name": "example_histogram", + "histogram": { + "dataPoints": [ + { + "attributes": [ + { + "key": "mykey3", + "value": { + "stringValue": "myvalue4" + } + } + ], + "startTimeUnixNano": "1734094309366875000", + "timeUnixNano": "1734094317871537000", + "count": "1", + "sum": 42, + "bucketCounts": [ + "0", + "0", + "0", + "0", + "1", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0" + ], + "explicitBounds": [ + 0, + 5, + 10, + 25, + 50, + 75, + 100, + 250, + 500, + 750, + 1000, + 2500, + 5000, + 7500, + 10000 + ], + "min": 42, + "max": 42 + } + ], + "aggregationTemporality": 2 + } + }, + { + "name": "example_up_down_counter", + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "mykey5", + "value": { + "stringValue": "myvalue5" + } + } + ], + "startTimeUnixNano": "1734094309366941000", + "timeUnixNano": "1734094317871548000", + "asInt": "-1" + } + ], + "aggregationTemporality": 2 + } + } + ] + } + ] + } + ] +} diff --git a/opentelemetry-otlp/tests/integration_test/expected/metrics.json b/opentelemetry-otlp/tests/integration_test/expected/metrics.json index fa713b8ea3..f1711d889e 100644 --- a/opentelemetry-otlp/tests/integration_test/expected/metrics.json +++ b/opentelemetry-otlp/tests/integration_test/expected/metrics.json @@ -6,7 +6,7 @@ { "key": "service.name", "value": { - "stringValue": "my.service" + "stringValue": "metrics-integration-test" } } ] @@ -14,106 +14,120 @@ "scopeMetrics": [ { "scope": { - "name": "my.library", - "version": "1.0.0", - "attributes": [ - { - "key": "my.scope.attribute", - "value": { - "stringValue": "some scope attribute" - } - } - ] + "name": "meter" }, "metrics": [ { - "name": "my.counter", - "unit": "1", - "description": "I am a Counter", - "metadata": [], + "name": "counter_u64", "sum": { - "aggregationTemporality": 1, - "isMonotonic": true, "dataPoints": [ { - "asDouble": 5, - "startTimeUnixNano": "1544712660300000000", - "timeUnixNano": "1544712660300000000", "attributes": [ { - "key": "my.counter.attr", + "key": "mykey1", "value": { - "stringValue": "some value" + "stringValue": "myvalue1" + } + }, + { + "key": "mykey2", + "value": { + "stringValue": "myvalue2" } } ], - "exemplars": [], - "flags": 0 + "startTimeUnixNano": "1734094309366798000", + "timeUnixNano": "1734094317871514000", + "asInt": "10" } - ] + ], + "aggregationTemporality": 2, + "isMonotonic": true } }, { - "name": "my.gauge", - "unit": "1", - "description": "I am a Gauge", - "metadata": [], - "gauge": { + "name": "example_histogram", + "histogram": { "dataPoints": [ { - "asDouble": 10, - "startTimeUnixNano": "1544712660300000000", - "timeUnixNano": "1544712660300000000", "attributes": [ { - "key": "my.gauge.attr", + "key": "mykey3", "value": { - "stringValue": "some value" + "stringValue": "myvalue4" } } ], - "exemplars": [], - "flags": 0 + "startTimeUnixNano": "1734094309366875000", + "timeUnixNano": "1734094317871537000", + "count": "1", + "sum": 42, + "bucketCounts": [ + "0", + "0", + "0", + "0", + "1", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0" + ], + "explicitBounds": [ + 0, + 5, + 10, + 25, + 50, + 75, + 100, + 250, + 500, + 750, + 1000, + 2500, + 5000, + 7500, + 10000 + ], + "min": 42, + "max": 42 } - ] + ], + "aggregationTemporality": 2 } }, { - "name": "my.histogram", - "unit": "1", - "description": "I am a Histogram", - "metadata": [], - "histogram": { - "aggregationTemporality": 1, + "name": "example_up_down_counter", + "sum": { "dataPoints": [ { - "startTimeUnixNano": "1544712660300000000", - "timeUnixNano": "1544712660300000000", - "count": 2, - "sum": 2, - "bucketCounts": [1,1], - "explicitBounds": [1], - "min": 0, - "max": 2, "attributes": [ { - "key": "my.histogram.attr", + "key": "mykey5", "value": { - "stringValue": "some value" + "stringValue": "myvalue5" } } ], - "exemplars": [], - "flags": 0 + "startTimeUnixNano": "1734094309366941000", + "timeUnixNano": "1734094317871548000", + "asInt": "-1" } - ] + ], + "aggregationTemporality": 2 } } - ], - "schemaUrl": "whatever" + ] } - ], - "schemaUrl": "whatever" + ] } ] -} \ No newline at end of file +} diff --git a/opentelemetry-otlp/tests/integration_test/expected/metrics/test_flush_on_shutdown.json b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_flush_on_shutdown.json new file mode 100644 index 0000000000..c390a70664 --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_flush_on_shutdown.json @@ -0,0 +1,39 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "metrics-integration-test" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": { + "name": "test_flush_on_shutdown" + }, + "metrics": [ + { + "name": "counter_", + "sum": { + "dataPoints": [ + { + "startTimeUnixNano": "1734370440803831000", + "timeUnixNano": "1734370440803905000", + "asInt": "123" + } + ], + "aggregationTemporality": 2, + "isMonotonic": true + } + } + ] + } + ] + } + ] +} diff --git a/opentelemetry-otlp/tests/integration_test/expected/metrics/test_histogram_meter.json b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_histogram_meter.json new file mode 100644 index 0000000000..9ca8a5a49e --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_histogram_meter.json @@ -0,0 +1,84 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "metrics-integration-test" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": { + "name": "test_histogram_meter" + }, + "metrics": [ + { + "name": "example_histogram", + "histogram": { + "dataPoints": [ + { + "attributes": [ + { + "key": "mykey3", + "value": { + "stringValue": "myvalue4" + } + } + ], + "startTimeUnixNano": "1734259947902842000", + "timeUnixNano": "1734259949551023000", + "count": "1", + "sum": 42, + "bucketCounts": [ + "0", + "0", + "0", + "0", + "1", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0" + ], + "explicitBounds": [ + 0, + 5, + 10, + 25, + 50, + 75, + 100, + 250, + 500, + 750, + 1000, + 2500, + 5000, + 7500, + 10000 + ], + "min": 42, + "max": 42 + } + ], + "aggregationTemporality": 2 + } + } + ] + } + ] + } + ] +} diff --git a/opentelemetry-otlp/tests/integration_test/expected/metrics/test_u64_counter_meter.json b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_u64_counter_meter.json new file mode 100644 index 0000000000..aeb3da7b20 --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_u64_counter_meter.json @@ -0,0 +1,53 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "metrics-integration-test" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": { + "name": "test_u64_counter_meter" + }, + "metrics": [ + { + "name": "counter_u64", + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "mykey1", + "value": { + "stringValue": "myvalue1" + } + }, + { + "key": "mykey2", + "value": { + "stringValue": "myvalue2" + } + } + ], + "startTimeUnixNano": "1734255506254812000", + "timeUnixNano": "1734255533415552000", + "asInt": "10" + } + ], + "aggregationTemporality": 2, + "isMonotonic": true + } + } + ] + } + ] + } + ] +} diff --git a/opentelemetry-otlp/tests/integration_test/expected/metrics/test_up_down_meter.json b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_up_down_meter.json new file mode 100755 index 0000000000..a82cd63acf --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/expected/metrics/test_up_down_meter.json @@ -0,0 +1,46 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "metrics-integration-test" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": { + "name": "test_up_down_meter" + }, + "metrics": [ + { + "name": "example_up_down_counter", + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "mykey5", + "value": { + "stringValue": "myvalue5" + } + } + ], + "startTimeUnixNano": "1734259947902844000", + "timeUnixNano": "1734259952816822000", + "asInt": "-1" + } + ], + "aggregationTemporality": 2 + } + } + ] + } + ] + } + ] +} diff --git a/opentelemetry-otlp/tests/integration_test/expected/serialized_metrics.json b/opentelemetry-otlp/tests/integration_test/expected/serialized_metrics.json index 4910e128a2..de13fb3cbf 100644 --- a/opentelemetry-otlp/tests/integration_test/expected/serialized_metrics.json +++ b/opentelemetry-otlp/tests/integration_test/expected/serialized_metrics.json @@ -6,7 +6,7 @@ { "key": "service.name", "value": { - "stringValue": "my.service" + "stringValue": "metrics-integration-test" } } ], @@ -15,112 +15,81 @@ "scopeMetrics": [ { "scope": { - "name": "my.library", - "version": "1.0.0", - "attributes": [ - { - "key": "my.scope.attribute", - "value": { - "stringValue": "some scope attribute" - } - } - ], + "name": "meter", + "version": "", + "attributes": [], "droppedAttributesCount": 0 }, "metrics": [ { - "name": "my.counter", - "description": "I am a Counter", - "unit": "1", + "name": "counter_u64", + "description": "", + "unit": "", "metadata": [], "sum": { "dataPoints": [ { "attributes": [ { - "key": "my.counter.attr", + "key": "mykey1", "value": { - "stringValue": "some value" + "stringValue": "myvalue1" + } + }, + { + "key": "mykey2", + "value": { + "stringValue": "myvalue2" } } ], - "startTimeUnixNano": "1544712660300000000", - "timeUnixNano": "1544712660300000000", + "startTimeUnixNano": "1734094309366798000", + "timeUnixNano": "1734094317871514000", "exemplars": [], - "flags": 0, - "asDouble": 5.0 + "flags": 0 } ], - "aggregationTemporality": 1, + "aggregationTemporality": 2, "isMonotonic": true } }, { - "name": "my.gauge", - "description": "I am a Gauge", - "unit": "1", - "metadata": [], - "gauge": { - "dataPoints": [ - { - "attributes": [ - { - "key": "my.gauge.attr", - "value": { - "stringValue": "some value" - } - } - ], - "startTimeUnixNano": "1544712660300000000", - "timeUnixNano": "1544712660300000000", - "exemplars": [], - "flags": 0, - "asDouble": 10.0 - } - ] - } + "name": "example_histogram", + "description": "", + "unit": "", + "metadata": [] }, { - "name": "my.histogram", - "description": "I am a Histogram", - "unit": "1", + "name": "example_up_down_counter", + "description": "", + "unit": "", "metadata": [], - "histogram": { + "sum": { "dataPoints": [ { "attributes": [ { - "key": "my.histogram.attr", + "key": "mykey5", "value": { - "stringValue": "some value" + "stringValue": "myvalue5" } } ], - "startTimeUnixNano": "1544712660300000000", - "timeUnixNano": "1544712660300000000", - "count": 2, - "sum": 2.0, - "bucketCounts": [ - 1, - 1 - ], - "explicitBounds": [ - 1.0 - ], + "startTimeUnixNano": "1734094309366941000", + "timeUnixNano": "1734094317871548000", "exemplars": [], - "flags": 0, - "min": 0.0, - "max": 2.0 + "flags": 0 } ], - "aggregationTemporality": 1 + "aggregationTemporality": 2, + "isMonotonic": false } } ], - "schemaUrl": "whatever" + "schemaUrl": "" } ], - "schemaUrl": "whatever" + "schemaUrl": "" } ] } \ No newline at end of file diff --git a/opentelemetry-otlp/tests/integration_test/otel-collector-config.yaml b/opentelemetry-otlp/tests/integration_test/otel-collector-config.yaml index 7cd19bbfee..548d6fa44a 100644 --- a/opentelemetry-otlp/tests/integration_test/otel-collector-config.yaml +++ b/opentelemetry-otlp/tests/integration_test/otel-collector-config.yaml @@ -7,14 +7,23 @@ receivers: endpoint: 0.0.0.0:4318 exporters: - file: - path: /testresults/result.json + file/traces: + path: /testresults/traces.json + file/logs: + path: /testresults/logs.json + rotation: + file/metrics: + path: /testresults/metrics.json service: pipelines: traces: receivers: [otlp] - exporters: [file] + exporters: [file/traces] logs: receivers: [otlp] - exporters: [file] + exporters: [file/logs] + metrics: + receivers: [otlp] + exporters: [file/metrics] + diff --git a/opentelemetry-otlp/tests/integration_test/src/images.rs b/opentelemetry-otlp/tests/integration_test/src/images.rs deleted file mode 100644 index 37a9c1b38b..0000000000 --- a/opentelemetry-otlp/tests/integration_test/src/images.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::collections::HashMap; -use testcontainers::core::WaitFor; -use testcontainers::Image; - -pub struct Collector { - volumes: HashMap, -} - -impl Image for Collector { - type Args = (); - - fn name(&self) -> String { - "otel/opentelemetry-collector".to_string() - } - - fn tag(&self) -> String { - "latest".to_string() - } - - fn ready_conditions(&self) -> Vec { - vec![WaitFor::Nothing] - } - - fn volumes(&self) -> Box + '_> { - Box::new(self.volumes.iter()) - } - - fn expose_ports(&self) -> Vec { - vec![ - // 4317, // gRPC port, defined in Dockerfile - // 4318, // HTTP port, defined in Dockerfile - ] - } -} - -impl Default for Collector { - fn default() -> Self { - Collector { - volumes: HashMap::from([( - "./otel-collector-config.yaml".into(), - "/etc/otelcol/config.yaml".into(), - )]), - } - } -} - -impl Collector { - pub fn with_volume(mut self, src: &str, dst: &str) -> Self { - self.volumes.insert(src.into(), dst.into()); - self - } -} diff --git a/opentelemetry-otlp/tests/integration_test/src/lib.rs b/opentelemetry-otlp/tests/integration_test/src/lib.rs index e6bc88c742..65faf81bf4 100644 --- a/opentelemetry-otlp/tests/integration_test/src/lib.rs +++ b/opentelemetry-otlp/tests/integration_test/src/lib.rs @@ -1,4 +1,4 @@ -pub mod images; pub mod logs_asserter; pub mod metrics_asserter; +pub mod test_utils; pub mod trace_asserter; diff --git a/opentelemetry-otlp/tests/integration_test/src/metrics_asserter.rs b/opentelemetry-otlp/tests/integration_test/src/metrics_asserter.rs index 4845270999..f370df8a62 100644 --- a/opentelemetry-otlp/tests/integration_test/src/metrics_asserter.rs +++ b/opentelemetry-otlp/tests/integration_test/src/metrics_asserter.rs @@ -1,40 +1,64 @@ +use anyhow::Result; +use serde_json::Value; use std::fs::File; +use std::io::{BufReader, Read}; -use opentelemetry_proto::tonic::metrics::v1::{MetricsData, ResourceMetrics}; +pub fn read_metrics_from_json(file: File) -> Result { + // Create a buffered reader for the file + let mut reader = BufReader::new(file); + let mut contents = String::new(); + + // Read the file contents into a string + reader + .read_to_string(&mut contents) + .expect("Failed to read json file"); + + // Parse the contents into a JSON Value + let metrics_data: Value = serde_json::from_str(&contents)?; + Ok(metrics_data) +} pub struct MetricsAsserter { - results: Vec, - expected: Vec, + results: Value, + expected: Value, } impl MetricsAsserter { - pub fn new(results: Vec, expected: Vec) -> Self { + pub fn new(results: Value, expected: Value) -> Self { MetricsAsserter { results, expected } } - pub fn assert(self) { - self.assert_resource_metrics_eq(&self.results, &self.expected); + pub fn assert(mut self) { + // Normalize JSON by cleaning out timestamps + Self::zero_out_timestamps(&mut self.results); + Self::zero_out_timestamps(&mut self.expected); + + // Perform the assertion + assert_eq!( + self.results, self.expected, + "Metrics did not match. Results: {:#?}, Expected: {:#?}", + self.results, self.expected + ); } - fn assert_resource_metrics_eq( - &self, - results: &[ResourceMetrics], - expected: &[ResourceMetrics], - ) { - assert_eq!(results.len(), expected.len()); - for i in 0..results.len() { - let result_resource_metrics = &results[i]; - let expected_resource_metrics = &expected[i]; - assert_eq!(result_resource_metrics, expected_resource_metrics); + /// Recursively removes or zeros out timestamp fields in the JSON + fn zero_out_timestamps(value: &mut Value) { + match value { + Value::Object(map) => { + for (key, val) in map.iter_mut() { + if key == "startTimeUnixNano" || key == "timeUnixNano" { + *val = Value::String("0".to_string()); + } else { + Self::zero_out_timestamps(val); + } + } + } + Value::Array(array) => { + for item in array.iter_mut() { + Self::zero_out_timestamps(item); + } + } + _ => {} } } } - -// read a file contains ResourceMetrics in json format -pub fn read_metrics_from_json(file: File) -> Vec { - let reader = std::io::BufReader::new(file); - - let metrics_data: MetricsData = - serde_json::from_reader(reader).expect("Failed to read json file"); - metrics_data.resource_metrics -} diff --git a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs new file mode 100644 index 0000000000..bd62674868 --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs @@ -0,0 +1,151 @@ +//! Supporting infrastructure for OTLP integration tests. +//! +//! This module provides the pieces needed to work with an actual opentelemetry-collector +//! instance, which is started in Docker and has its output plumbed back into the host filesystem. +//! This lets us write tests that push data over OTLP (HTTP or gRPC) to the collector, and then read +//! that data back from the filesystem to ensure everything worked out as expected. +//! +//! To use this module, all you need to do is call `start_collector_container()` from each +//! of your tests, and use a single `#[dtor]` at the end of your test file to call +//! `stop_collector_container`. Note that as cargo integration tests run a process-per-test-file, +//! each test will get its own fresh instance of the container. +//! +//! Only a single test suite can run at once, as each container has statically mapped ports, but +//! this works nicely with the way cargo executes the suite. +//! +//! To skip integration tests with cargo, you can run `cargo test --mod`, which will run unit tests +//! only. +//! +#![cfg(unix)] + +use anyhow::Result; +use opentelemetry::{otel_debug, otel_info}; +use std::fs; +use std::fs::File; +use std::os::unix::fs::PermissionsExt; +use std::sync::{Arc, Mutex, Once, OnceLock}; +use testcontainers::core::wait::HttpWaitStrategy; +use testcontainers::core::{ContainerPort, Mount}; +use testcontainers::{core::WaitFor, runners::AsyncRunner, ContainerAsync, GenericImage, ImageExt}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Layer}; + +// Static references for container management +static COLLECTOR_ARC: OnceLock>>>> = OnceLock::new(); + +pub static METRICS_FILE: &str = "./actual/metrics.json"; +pub static LOGS_FILE: &str = "./actual/logs.json"; +pub static TRACES_FILE: &str = "./actual/traces.json"; + +static INIT_TRACING: Once = Once::new(); + +fn init_tracing() { + INIT_TRACING.call_once(|| { + // Info and above for all, debug for opentelemetry + let filter_fmt = + EnvFilter::new("info").add_directive("opentelemetry=debug".parse().unwrap()); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_thread_names(true) + .with_filter(filter_fmt); + + // Initialize the tracing subscriber with the OpenTelemetry layer and the + // Fmt layer. + tracing_subscriber::registry().with(fmt_layer).init(); + otel_info!(name: "tracing initializing completed!"); + }); +} + +pub async fn start_collector_container() -> Result<()> { + init_tracing(); + + let mut arc_guard = COLLECTOR_ARC + .get_or_init(|| Mutex::new(None)) + .lock() + .unwrap(); + + // If the container isn't running, start it. + if arc_guard.is_none() { + // Make sure all our test data is mounted + upsert_empty_file(METRICS_FILE); + upsert_empty_file(TRACES_FILE); + upsert_empty_file(LOGS_FILE); + + // Start a new container + let container_instance = GenericImage::new("otel/opentelemetry-collector", "latest") + .with_wait_for(WaitFor::http( + HttpWaitStrategy::new("/") + .with_expected_status_code(404u16) + .with_port(ContainerPort::Tcp(4318)), + )) + .with_mapped_port(4317, ContainerPort::Tcp(4317)) + .with_mapped_port(4318, ContainerPort::Tcp(4318)) + .with_mount(Mount::bind_mount( + fs::canonicalize("./otel-collector-config.yaml")?.to_string_lossy(), + "/etc/otelcol/config.yaml", + )) + .with_mount(Mount::bind_mount( + fs::canonicalize("./actual/logs.json")?.to_string_lossy(), + "/testresults/logs.json", + )) + .with_mount(Mount::bind_mount( + fs::canonicalize("./actual/metrics.json")?.to_string_lossy(), + "/testresults/metrics.json", + )) + .with_mount(Mount::bind_mount( + fs::canonicalize("./actual/traces.json")?.to_string_lossy(), + "/testresults/traces.json", + )) + .start() + .await?; + + let container = Arc::new(container_instance); + otel_debug!( + name: "Container started", + ports = format!("{:?}", container.ports().await)); + + // Give the container a second to stabilize + //tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + // Store the container in COLLECTOR_ARC + *arc_guard = Some(Arc::clone(&container)); + } + + Ok(()) +} + +/// +/// Creates an empty file with permissions that make it usable both within docker +/// and on the host. +/// +fn upsert_empty_file(path: &str) -> File { + let file = File::create(path).unwrap(); + file.set_permissions(std::fs::Permissions::from_mode(0o666)) + .unwrap(); + file +} + +/// +/// Shuts down our collector container. This should be run as part of each test +/// suite shutting down! +/// +pub fn stop_collector_container() { + // This is a bit heinous. We don't have an async runtime left when + // we hit this call, so we can't use the async methods on the testcontainers + // interface to shutdown. + // We _need_ to do this here, because otherwise we have no "all the tests in the module + // were complete" hook. + // + // https://github.com/testcontainers/testcontainers-rs/issues/707 + otel_debug!(name: "stop_collector_container"); + + if let Some(mutex_option_arc) = COLLECTOR_ARC.get() { + let guard = mutex_option_arc.lock().unwrap(); + if let Some(container_arc) = &*guard { + std::process::Command::new("docker") + .args(["container", "rm", "-f", container_arc.id()]) + .output() + .expect("failed to stop testcontainer"); + } + } +} diff --git a/opentelemetry-otlp/tests/integration_test/src/trace_asserter.rs b/opentelemetry-otlp/tests/integration_test/src/trace_asserter.rs index 00c7c2300d..ce7eec928a 100644 --- a/opentelemetry-otlp/tests/integration_test/src/trace_asserter.rs +++ b/opentelemetry-otlp/tests/integration_test/src/trace_asserter.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, Span, TracesData}; use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; @@ -213,9 +214,9 @@ fn span_eq(left: &Span, right: &Span) -> bool { } // read a file contains ResourceSpans in json format -pub fn read_spans_from_json(file: File) -> Vec { +pub fn read_spans_from_json(file: File) -> Result> { let reader = std::io::BufReader::new(file); - let trace_data: TracesData = serde_json::from_reader(reader).expect("Failed to read json file"); - trace_data.resource_spans + let trace_data: TracesData = serde_json::from_reader(reader)?; + Ok(trace_data.resource_spans) } diff --git a/opentelemetry-otlp/tests/integration_test/tests/integration_tests.rs b/opentelemetry-otlp/tests/integration_test/tests/integration_tests.rs deleted file mode 100644 index 5f5468d0dc..0000000000 --- a/opentelemetry-otlp/tests/integration_test/tests/integration_tests.rs +++ /dev/null @@ -1,142 +0,0 @@ -#![cfg(unix)] - -use integration_test_runner::images::Collector; -use std::fs::File; -use std::os::unix::fs::PermissionsExt; -use std::time::Duration; -use testcontainers::clients::Cli; -use testcontainers::core::Port; -use testcontainers::RunnableImage; - -mod logs; -mod metrics; -mod traces; - -const COLLECTOR_CONTAINER_NAME: &str = "otel-collector"; -const TEST_RESULT_DIR_IN_CONTAINER: &str = "testresults"; -const EXPECTED_DIR: &str = "./expected"; -const RESULT_FILE_PATH: &str = "./result.json"; - -struct TestSuite { - expected_file_path: &'static str, -} - -impl TestSuite { - fn new(expected_file_path: &'static str) -> Self { - Self { expected_file_path } - } - - pub fn expected_file_path(&self) -> String { - format!("{}/{}", EXPECTED_DIR, self.expected_file_path) - } - - pub fn result_file_path_in_container(&self) -> String { - format!("/{}/{}", TEST_RESULT_DIR_IN_CONTAINER, RESULT_FILE_PATH) - } - - pub fn result_file_path(&self) -> String { - format!("./{}", RESULT_FILE_PATH) - } - - /// Create a empty file on localhost and copy it to container with proper permissions - /// we have to create the file for the container otherwise we will encounter a permission denied error. - /// see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/3159 - pub fn create_temporary_result_file(&self) -> File { - let file = File::create(self.result_file_path()).unwrap(); - file.set_permissions(std::fs::Permissions::from_mode(0o666)) - .unwrap(); - file - } -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore] // skip when running unit test -async fn integration_tests() { - trace_integration_tests().await; - logs_integration_tests().await; -} - -async fn trace_integration_tests() { - let test_suites = [TestSuite::new("traces.json")]; - let mut collector_image = Collector::default(); - for test in test_suites.as_ref() { - let _ = test.create_temporary_result_file(); - collector_image = collector_image.with_volume( - test.result_file_path().as_str(), - test.result_file_path_in_container().as_str(), - ); - } - - let docker = Cli::default(); - let mut image = - RunnableImage::from(collector_image).with_container_name(COLLECTOR_CONTAINER_NAME); - - for port in [ - 4317, // gRPC port - 4318, // HTTP port - ] { - image = image.with_mapped_port(Port { - local: port, - internal: port, - }) - } - - let collector_container = docker.run(image); - - tokio::time::sleep(Duration::from_secs(5)).await; - traces::traces().await.unwrap(); - - // wait for file to flush to disks - // ideally we should use volume mount but otel collector file exporter doesn't handle permission too well - // bind mount mitigate the issue by set up the permission correctly on host system - tokio::time::sleep(Duration::from_secs(5)).await; - traces::assert_traces_results( - test_suites[0].result_file_path().as_str(), - test_suites[0].expected_file_path().as_str(), - ); - - collector_container.stop(); -} - -async fn logs_integration_tests() { - let test_suites = [TestSuite::new("logs.json")]; - - let mut collector_image = Collector::default(); - for test in test_suites.as_ref() { - let _ = test.create_temporary_result_file(); - collector_image = collector_image.with_volume( - test.result_file_path().as_str(), - test.result_file_path_in_container().as_str(), - ); - } - - let docker = Cli::default(); - let mut image = - RunnableImage::from(collector_image).with_container_name(COLLECTOR_CONTAINER_NAME); - - for port in [ - 4317, // gRPC port - 4318, // HTTP port - ] { - image = image.with_mapped_port(Port { - local: port, - internal: port, - }) - } - - let collector_container = docker.run(image); - - tokio::time::sleep(Duration::from_secs(5)).await; - logs::logs().await.unwrap(); - - // wait for file to flush to disks - // ideally we should use volume mount but otel collector file exporter doesn't handle permission too well - // bind mount mitigate the issue by set up the permission correctly on host system - tokio::time::sleep(Duration::from_secs(5)).await; - logs::assert_logs_results( - test_suites[0].result_file_path().as_str(), - test_suites[0].expected_file_path().as_str(), - ); - - collector_container.stop(); -} diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 4ff7b67eb6..8498a24913 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -1,16 +1,19 @@ #![cfg(unix)] +use anyhow::Result; +use ctor::dtor; use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; +use integration_test_runner::test_utils; use log::{info, Level}; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::LogExporter; -use opentelemetry_sdk::logs::{LogError, LoggerProvider}; +use opentelemetry_sdk::logs::LoggerProvider; use opentelemetry_sdk::{logs as sdklogs, runtime, Resource}; -use std::error::Error; use std::fs::File; use std::os::unix::fs::MetadataExt; +use std::time::Duration; -fn init_logs() -> Result { +fn init_logs() -> Result { let exporter_builder = LogExporter::builder(); #[cfg(feature = "tonic-client")] let exporter_builder = exporter_builder.with_tonic(); @@ -34,7 +37,11 @@ fn init_logs() -> Result { .build()) } -pub async fn logs() -> Result<(), Box> { +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +pub async fn test_logs() -> Result<()> { + // Make sure the container is running + test_utils::start_collector_container().await?; + let logger_provider = init_logs().unwrap(); let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider); log::set_boxed_logger(Box::new(otel_log_appender))?; @@ -42,6 +49,11 @@ pub async fn logs() -> Result<(), Box> { info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99); let _ = logger_provider.shutdown(); + + tokio::time::sleep(Duration::from_secs(10)).await; + + assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); + Ok(()) } @@ -67,3 +79,12 @@ pub fn test_assert_logs_eq() { let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap()); LogsAsserter::new(logs.clone(), logs).assert(); } + +/// +/// Make sure we stop the collector container, otherwise it will sit around hogging our +/// ports and subsequent test runs will fail. +/// +#[dtor] +fn shutdown() { + test_utils::stop_collector_container(); +} diff --git a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs index 5395c67d58..125c501e14 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs @@ -1,23 +1,332 @@ -use std::{fs::File, io::Write}; +//! OTLP integration tests for metrics +//! Note: these are all expressed using Serde types for the deserialized metrics records. +//! We might consider changing this once we have fixed the issue identified in the #[ignore]d test +//! `test_roundtrip_example_data` - as the roundtripping is currently broken for metrics. +//! +#![cfg(unix)] +use anyhow::{Context, Result}; +use ctor::dtor; use integration_test_runner::metrics_asserter::{read_metrics_from_json, MetricsAsserter}; +use integration_test_runner::test_utils; +use integration_test_runner::test_utils::start_collector_container; +use opentelemetry::KeyValue; +use opentelemetry_otlp::MetricExporter; use opentelemetry_proto::tonic::metrics::v1::MetricsData; +use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}; +use opentelemetry_sdk::Resource; +use serde_json::Value; +use std::fs; +use std::fs::File; +use std::sync::Mutex; +use std::time::Duration; -#[test] -fn test_serde() { - let metrics = read_metrics_from_json(File::open("./expected/metrics.json").unwrap()); +static SETUP_DONE: Mutex = Mutex::new(false); - let json = serde_json::to_string_pretty(&MetricsData { - resource_metrics: metrics, - }) - .expect("Failed to serialize metrics"); +static RESULT_PATH: &str = "actual/metrics.json"; - // Write to file. - let mut file = File::create("./expected/serialized_metrics.json").unwrap(); - file.write_all(json.as_bytes()).unwrap(); +/// Initializes the OpenTelemetry metrics pipeline +async fn init_metrics() -> SdkMeterProvider { + let exporter = create_exporter(); - let left = read_metrics_from_json(File::open("./expected/metrics.json").unwrap()); - let right = read_metrics_from_json(File::open("./expected/serialized_metrics.json").unwrap()); + let reader = PeriodicReader::builder(exporter) + .with_interval(Duration::from_millis(500)) + .with_timeout(Duration::from_secs(1)) + .build(); - MetricsAsserter::new(left, right).assert(); + let resource = Resource::builder_empty() + .with_service_name("metrics-integration-test") + .build(); + + let meter_provider = MeterProviderBuilder::default() + .with_resource(resource) + .with_reader(reader) + .build(); + + opentelemetry::global::set_meter_provider(meter_provider.clone()); + + meter_provider +} + +/// +/// Creates an exporter using the appropriate HTTP or gRPC client based on +/// the configured features. +/// +fn create_exporter() -> MetricExporter { + let exporter_builder = MetricExporter::builder(); + + #[cfg(feature = "tonic-client")] + let exporter_builder = exporter_builder.with_tonic(); + #[cfg(not(feature = "tonic-client"))] + #[cfg(any( + feature = "hyper-client", + feature = "reqwest-client", + feature = "reqwest-blocking-client" + ))] + let exporter_builder = exporter_builder.with_http(); + + exporter_builder + .build() + .expect("Failed to build MetricExporter") +} + +/// +/// Retrieves the latest metrics for the given scope. Each test should use +/// its own scope, so that we can easily pull the data for it out from the rest +/// of the data. +/// +/// This will also retrieve the resource attached to the scope. +/// +pub fn fetch_latest_metrics_for_scope(scope_name: &str) -> Result { + // Open the file and fetch the contents + let contents = fs::read_to_string(test_utils::METRICS_FILE)?; + + // Find the last parseable metrics line that contains the desired scope + let json_line = contents + .lines() + .rev() + .find_map(|line| { + // Attempt to parse the line as JSON + serde_json::from_str::(line) + .ok() + .and_then(|mut json_line| { + // Check if it contains the specified scope + if let Some(resource_metrics) = json_line + .get_mut("resourceMetrics") + .and_then(|v| v.as_array_mut()) + { + resource_metrics.retain_mut(|resource| { + if let Some(scope_metrics) = resource + .get_mut("scopeMetrics") + .and_then(|v| v.as_array_mut()) + { + scope_metrics.retain(|scope| { + scope + .get("scope") + .and_then(|s| s.get("name")) + .and_then(|name| name.as_str()) + .map_or(false, |n| n == scope_name) + }); + + // Keep the resource only if it has any matching `ScopeMetrics` + !scope_metrics.is_empty() + } else { + false + } + }); + + // If any resource metrics remain, return this line + if !resource_metrics.is_empty() { + return Some(json_line); + } + } + + None + }) + }) + .with_context(|| { + format!( + "No valid JSON line containing scope `{}` found.", + scope_name + ) + })?; + + Ok(json_line) +} + +/// +/// Performs setup for metrics tests +/// +async fn setup_metrics_test() -> Result<()> { + // Make sure the collector container is running + start_collector_container().await?; + + let mut done = SETUP_DONE.lock().unwrap(); + if !*done { + println!("Running setup before any tests..."); + *done = true; // Mark setup as done + + // Initialize the metrics subsystem + _ = init_metrics().await; + } + + // Truncate results + _ = File::create(RESULT_PATH).expect("it's good"); + + Ok(()) +} + +/// +/// Check that the metrics for the given scope match what we expect. This +/// includes zeroing out timestamps, which we reasonably expect not to match. +/// +pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> { + // Define the results file path + let results_file_path = format!("./expected/metrics/{}.json", scope_name); + + // Fetch the actual metrics for the given scope + let actual_metrics = fetch_latest_metrics_for_scope(scope_name) + .context(format!("Failed to fetch metrics for scope: {}", scope_name))?; + + // Read the expected metrics from the results file + let expected_metrics = { + let file = File::open(&results_file_path).context(format!( + "Failed to open results file: {}", + results_file_path + ))?; + read_metrics_from_json(file) + }?; + + // Compare the actual metrics with the expected metrics + MetricsAsserter::new(actual_metrics, expected_metrics).assert(); + + Ok(()) +} + +/// +/// TODO - the HTTP metrics exporters except reqwest-blocking-client do not seem +/// to work at the moment. +/// TODO - fix this asynchronously. +/// +#[cfg(test)] +#[cfg(not(feature = "hyper-client"))] +#[cfg(not(feature = "reqwest-client"))] +mod tests { + + use super::*; + use opentelemetry::metrics::MeterProvider; + + /// + /// Validate JSON/Protobuf models roundtrip correctly. + /// + /// TODO - this test fails currently. Fields disappear, such as the actual value of a given metric. + /// This appears to be on the _deserialization_ side. + /// Issue: https://github.com/open-telemetry/opentelemetry-rust/issues/2434 + /// + #[tokio::test] + #[ignore] + async fn test_roundtrip_example_data() -> Result<()> { + let metrics_in = include_str!("../expected/metrics/test_u64_counter_meter.json"); + let metrics: MetricsData = serde_json::from_str(metrics_in)?; + let metrics_out = serde_json::to_string(&metrics)?; + + println!("{:}", metrics_out); + + let metrics_in_json: Value = serde_json::from_str(metrics_in)?; + let metrics_out_json: Value = serde_json::from_str(&metrics_out)?; + + assert_eq!(metrics_in_json, metrics_out_json); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_u64_counter() -> Result<()> { + let _result_path = setup_metrics_test().await; + const METER_NAME: &str = "test_u64_counter_meter"; + + // Add data to u64_counter + let meter = opentelemetry::global::meter_provider().meter(METER_NAME); + + let counter = meter.u64_counter("counter_u64").build(); + counter.add( + 10, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ); + + tokio::time::sleep(Duration::from_secs(2)).await; + + // Validate metrics against results file + validate_metrics_against_results(METER_NAME)?; + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + // #[ignore] // skip when running unit test + async fn test_histogram() -> Result<()> { + _ = setup_metrics_test().await; + const METER_NAME: &str = "test_histogram_meter"; + + // Add data to histogram + let meter = opentelemetry::global::meter_provider().meter(METER_NAME); + let histogram = meter.u64_histogram("example_histogram").build(); + histogram.record(42, &[KeyValue::new("mykey3", "myvalue4")]); + tokio::time::sleep(Duration::from_secs(5)).await; + + validate_metrics_against_results(METER_NAME)?; + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + // #[ignore] // skip when running unit test + async fn test_up_down_counter() -> Result<()> { + _ = setup_metrics_test().await; + const METER_NAME: &str = "test_up_down_meter"; + + // Add data to up_down_counter + let meter = opentelemetry::global::meter_provider().meter(METER_NAME); + let up_down_counter = meter.i64_up_down_counter("example_up_down_counter").build(); + up_down_counter.add(-1, &[KeyValue::new("mykey5", "myvalue5")]); + tokio::time::sleep(Duration::from_secs(5)).await; + + validate_metrics_against_results(METER_NAME)?; + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[ignore] + async fn test_flush_on_shutdown() -> Result<()> { + const METER_NAME: &str = "test_flush_on_shutdown"; + + // Set everything up by hand, so that we can shutdown() the exporter + // and make sure our data is flushed through. + + // Make sure the collector is running + start_collector_container().await?; + + // Set up the exporter + let exporter = create_exporter(); + let reader = PeriodicReader::builder(exporter) + .with_interval(Duration::from_secs(30)) + .with_timeout(Duration::from_secs(1)) + .build(); + let resource = Resource::builder_empty() + .with_service_name("metrics-integration-test") + .build(); + let meter_provider = MeterProviderBuilder::default() + .with_resource(resource) + .with_reader(reader) + .build(); + + // Send something + let meter = meter_provider.meter(METER_NAME); + let counter = meter.u64_counter("counter_").build(); + counter.add(123, &[]); + + // Shutdown + meter_provider.shutdown()?; + + // We still need to sleep, to give otel-collector a chance to flush to disk + tokio::time::sleep(Duration::from_secs(2)).await; + + validate_metrics_against_results(METER_NAME)?; + + Ok(()) + } +} + +/// +/// Make sure we stop the collector container, otherwise it will sit around hogging our +/// ports and subsequent test runs will fail. +/// +#[dtor] +fn shutdown() { + println!("metrics::shutdown"); + test_utils::stop_collector_container(); } diff --git a/opentelemetry-otlp/tests/integration_test/tests/traces.rs b/opentelemetry-otlp/tests/integration_test/tests/traces.rs index 20a2bb15a5..1601e04132 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/traces.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/traces.rs @@ -9,12 +9,16 @@ use opentelemetry::{ }; use opentelemetry_otlp::SpanExporter; +use anyhow::Result; +use ctor::dtor; +use integration_test_runner::test_utils; use opentelemetry_proto::tonic::trace::v1::TracesData; use opentelemetry_sdk::{runtime, trace as sdktrace, Resource}; -use std::error::Error; use std::fs::File; use std::io::Write; use std::os::unix::fs::MetadataExt; +use std::time::Duration; +use tokio::time::sleep; fn init_tracer_provider() -> Result { let exporter_builder = SpanExporter::builder(); @@ -43,7 +47,10 @@ fn init_tracer_provider() -> Result { const LEMONS_KEY: Key = Key::from_static_str("lemons"); const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another"); -pub async fn traces() -> Result<(), Box> { +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +pub async fn traces() -> Result<()> { + test_utils::start_collector_container().await?; + let tracer_provider = init_tracer_provider().expect("Failed to initialize tracer provider."); global::set_tracer_provider(tracer_provider.clone()); @@ -67,42 +74,51 @@ pub async fn traces() -> Result<(), Box> { tracer_provider.shutdown()?; + // Give it a second to flush + sleep(Duration::from_secs(2)).await; + + // Validate results + assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?; + Ok(()) } -pub fn assert_traces_results(result: &str, expected: &str) { - let left = read_spans_from_json(File::open(expected).unwrap()); - let right = read_spans_from_json(File::open(result).unwrap()); +pub fn assert_traces_results(result: &str, expected: &str) -> Result<()> { + let left = read_spans_from_json(File::open(expected)?)?; + let right = read_spans_from_json(File::open(result)?)?; TraceAsserter::new(left, right).assert(); // we cannot read result json file because the timestamp was represents as string instead of u64. // need to fix it on json file exporter + assert!(File::open(result)?.metadata()?.size() > 0); - assert!(File::open(result).unwrap().metadata().unwrap().size() > 0) + Ok(()) } #[test] #[should_panic(expected = "left: \"Sub operation...\"")] // we swap the parent spans with child spans in failed_traces.json pub fn test_assert_span_eq_failure() { - let left = read_spans_from_json(File::open("./expected/traces.json").unwrap()); - let right = read_spans_from_json(File::open("./expected/failed_traces.json").unwrap()); + let left = read_spans_from_json(File::open("./expected/traces.json").unwrap()).unwrap(); + let right = read_spans_from_json(File::open("./expected/failed_traces.json").unwrap()).unwrap(); TraceAsserter::new(right, left).assert(); } #[test] -pub fn test_assert_span_eq() { - let spans = read_spans_from_json(File::open("./expected/traces.json").unwrap()); +pub fn test_assert_span_eq() -> Result<()> { + let spans = read_spans_from_json(File::open("./expected/traces.json")?)?; TraceAsserter::new(spans.clone(), spans).assert(); + + Ok(()) } #[test] -pub fn test_serde() { +pub fn test_serde() -> Result<()> { let spans = read_spans_from_json( File::open("./expected/traces.json").expect("Failed to read traces.json"), - ); + )?; let json = serde_json::to_string_pretty(&TracesData { resource_spans: spans, }) @@ -114,11 +130,22 @@ pub fn test_serde() { let left = read_spans_from_json( File::open("./expected/traces.json").expect("Failed to read traces.json"), - ); + )?; let right = read_spans_from_json( File::open("./expected/serialized_traces.json") .expect("Failed to read serialized_traces.json"), - ); + )?; TraceAsserter::new(left, right).assert(); + + Ok(()) +} + +/// +/// Make sure we stop the collector container, otherwise it will sit around hogging our +/// ports and subsequent test runs will fail. +/// +#[dtor] +fn shutdown() { + test_utils::stop_collector_container(); } diff --git a/opentelemetry-proto/CHANGELOG.md b/opentelemetry-proto/CHANGELOG.md index 0d8d14866b..3fc9721c5c 100644 --- a/opentelemetry-proto/CHANGELOG.md +++ b/opentelemetry-proto/CHANGELOG.md @@ -4,6 +4,8 @@ - Update proto definitions to v1.4.0 [#2315](https://github.com/open-telemetry/opentelemetry-rust/pull/2315) - Bump msrv to 1.75.0. +- Update proto definitions to v1.5.0 [#2439](https://github.com/open-telemetry/opentelemetry-rust/pull/2439) + ## 0.27.0 diff --git a/opentelemetry-proto/src/proto/opentelemetry-proto b/opentelemetry-proto/src/proto/opentelemetry-proto index 0adf6aac00..2bd940b2b7 160000 --- a/opentelemetry-proto/src/proto/opentelemetry-proto +++ b/opentelemetry-proto/src/proto/opentelemetry-proto @@ -1 +1 @@ -Subproject commit 0adf6aac004578b28267394514b2e55ee9cc012f +Subproject commit 2bd940b2b77c1ab57c27166af21384906da7bb2b diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.logs.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.logs.v1.rs index 0eb691fb3a..04f1c6dad4 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.logs.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.logs.v1.rs @@ -37,7 +37,8 @@ pub struct ResourceLogs { #[prost(message, repeated, tag = "2")] pub scope_logs: ::prost::alloc::vec::Vec, /// The Schema URL, if known. This is the identifier of the Schema that the resource data - /// is recorded in. To learn more about Schema URL see + /// is recorded in. Notably, the last part of the URL path is the version number of the + /// schema: http\[s\]://server\[:port\]/path/. To learn more about Schema URL see /// /// This schema_url applies to the data in the "resource" field. It does not apply /// to the data in the "scope_logs" field which have their own schema_url field. @@ -60,7 +61,8 @@ pub struct ScopeLogs { #[prost(message, repeated, tag = "2")] pub log_records: ::prost::alloc::vec::Vec, /// The Schema URL, if known. This is the identifier of the Schema that the log data - /// is recorded in. To learn more about Schema URL see + /// is recorded in. Notably, the last part of the URL path is the version number of the + /// schema: http\[s\]://server\[:port\]/path/. To learn more about Schema URL see /// /// This schema_url applies to all logs in the "logs" field. #[prost(string, tag = "3")] @@ -178,6 +180,20 @@ pub struct LogRecord { ) )] pub span_id: ::prost::alloc::vec::Vec, + /// A unique identifier of event category/type. + /// All events with the same event_name are expected to conform to the same + /// schema for both their attributes and their body. + /// + /// Recommended to be fully qualified and short (no longer than 256 characters). + /// + /// Presence of event_name on the log record identifies this record + /// as an event. + /// + /// \[Optional\]. + /// + /// Status: \[Development\] + #[prost(string, tag = "12")] + pub event_name: ::prost::alloc::string::String, } /// Possible values for LogRecord.SeverityNumber. #[cfg_attr(feature = "with-schemars", derive(schemars::JsonSchema))] diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs index 66e316689d..fb04bec6cc 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs @@ -55,7 +55,8 @@ pub struct ResourceMetrics { #[prost(message, repeated, tag = "2")] pub scope_metrics: ::prost::alloc::vec::Vec, /// The Schema URL, if known. This is the identifier of the Schema that the resource data - /// is recorded in. To learn more about Schema URL see + /// is recorded in. Notably, the last part of the URL path is the version number of the + /// schema: http\[s\]://server\[:port\]/path/. To learn more about Schema URL see /// /// This schema_url applies to the data in the "resource" field. It does not apply /// to the data in the "scope_metrics" field which have their own schema_url field. @@ -78,7 +79,8 @@ pub struct ScopeMetrics { #[prost(message, repeated, tag = "2")] pub metrics: ::prost::alloc::vec::Vec, /// The Schema URL, if known. This is the identifier of the Schema that the metric data - /// is recorded in. To learn more about Schema URL see + /// is recorded in. Notably, the last part of the URL path is the version number of the + /// schema: http\[s\]://server\[:port\]/path/. To learn more about Schema URL see /// /// This schema_url applies to all metrics in the "metrics" field. #[prost(string, tag = "3")] @@ -437,7 +439,7 @@ pub struct HistogramDataPoint { /// events, and is assumed to be monotonic over the values of these events. /// Negative events *can* be recorded, but sum should not be filled out when /// doing so. This is specifically to enforce compatibility w/ OpenMetrics, - /// see: + /// see: #[prost(double, optional, tag = "5")] pub sum: ::core::option::Option, /// bucket_counts is an optional field contains the count values of histogram @@ -520,7 +522,7 @@ pub struct ExponentialHistogramDataPoint { /// events, and is assumed to be monotonic over the values of these events. /// Negative events *can* be recorded, but sum should not be filled out when /// doing so. This is specifically to enforce compatibility w/ OpenMetrics, - /// see: + /// see: #[prost(double, optional, tag = "5")] pub sum: ::core::option::Option, /// scale describes the resolution of the histogram. Boundaries are @@ -643,7 +645,7 @@ pub struct SummaryDataPoint { /// events, and is assumed to be monotonic over the values of these events. /// Negative events *can* be recorded, but sum should not be filled out when /// doing so. This is specifically to enforce compatibility w/ OpenMetrics, - /// see: + /// see: #[prost(double, tag = "5")] pub sum: f64, /// (Optional) list of values at different quantiles of the distribution calculated diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.trace.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.trace.v1.rs index d51a470d13..2e37483e94 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.trace.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.trace.v1.rs @@ -37,7 +37,8 @@ pub struct ResourceSpans { #[prost(message, repeated, tag = "2")] pub scope_spans: ::prost::alloc::vec::Vec, /// The Schema URL, if known. This is the identifier of the Schema that the resource data - /// is recorded in. To learn more about Schema URL see + /// is recorded in. Notably, the last part of the URL path is the version number of the + /// schema: http\[s\]://server\[:port\]/path/. To learn more about Schema URL see /// /// This schema_url applies to the data in the "resource" field. It does not apply /// to the data in the "scope_spans" field which have their own schema_url field. @@ -60,7 +61,8 @@ pub struct ScopeSpans { #[prost(message, repeated, tag = "2")] pub spans: ::prost::alloc::vec::Vec, /// The Schema URL, if known. This is the identifier of the Schema that the span data - /// is recorded in. To learn more about Schema URL see + /// is recorded in. Notably, the last part of the URL path is the version number of the + /// schema: http\[s\]://server\[:port\]/path/. To learn more about Schema URL see /// /// This schema_url applies to all spans and span events in the "spans" field. #[prost(string, tag = "3")] diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index 0f2e614550..785bdfd97f 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -118,6 +118,7 @@ pub mod tonic { #[cfg(not(feature = "populate-logs-event-name"))] attributes }, + event_name: log_record.event_name().unwrap_or_default().into(), severity_number: severity_number.into(), severity_text: log_record .severity_text() diff --git a/opentelemetry-proto/tests/json_serde.rs b/opentelemetry-proto/tests/json_serde.rs index 3295e682c1..389541cce7 100644 --- a/opentelemetry-proto/tests/json_serde.rs +++ b/opentelemetry-proto/tests/json_serde.rs @@ -1187,6 +1187,7 @@ mod json_serde { "Example log record", ))), }), + event_name: "test_log_event".to_string(), attributes: vec![ KeyValue { key: String::from("string.attribute"), @@ -1359,7 +1360,8 @@ mod json_serde { "droppedAttributesCount": 0, "flags": 0, "traceId": "5b8efff798038103d269b633813fc60c", - "spanId": "eee19b7ec3c1b174" + "spanId": "eee19b7ec3c1b174", + "eventName": "test_log_event" } ], "schemaUrl": "" @@ -1465,7 +1467,8 @@ mod json_serde { } } } - ] + ], + "eventName": "test_log_event" } ] } diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index edfc50d9d1..aaad464747 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -97,60 +97,56 @@ impl Default for AggregateTimeInitiator { } } +type Filter = Arc bool + Send + Sync>; + +/// Applies filter on provided attribute set +/// No-op, if filter is not set +#[derive(Clone)] +pub(crate) struct AttributeSetFilter { + filter: Option, +} + +impl AttributeSetFilter { + pub(crate) fn new(filter: Option) -> Self { + Self { filter } + } + + pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) { + if let Some(filter) = &self.filter { + let filtered_attrs: Vec = + attrs.iter().filter(|kv| filter(kv)).cloned().collect(); + run(&filtered_attrs); + } else { + run(attrs); + }; + } +} + /// Builds aggregate functions pub(crate) struct AggregateBuilder { /// The temporality used for the returned aggregate functions. - /// - /// If this is not provided, a default of cumulative will be used (except for the - /// last-value aggregate function where delta is the only appropriate - /// temporality). - temporality: Option, + temporality: Temporality, /// The attribute filter the aggregate function will use on the input of /// measurements. - filter: Option, + filter: AttributeSetFilter, _marker: marker::PhantomData, } -type Filter = Arc bool + Send + Sync>; - impl AggregateBuilder { - pub(crate) fn new(temporality: Option, filter: Option) -> Self { + pub(crate) fn new(temporality: Temporality, filter: Option) -> Self { AggregateBuilder { temporality, - filter, + filter: AttributeSetFilter::new(filter), _marker: marker::PhantomData, } } - /// Wraps the passed in measure with an attribute filtering function. - fn filter(&self, f: impl Measure) -> impl Measure { - let filter = self.filter.clone(); - move |n, attrs: &[KeyValue]| { - if let Some(filter) = &filter { - let filtered_attrs: Vec = - attrs.iter().filter(|kv| filter(kv)).cloned().collect(); - f.call(n, &filtered_attrs); - } else { - f.call(n, attrs); - }; - } - } - /// Builds a last-value aggregate function input and output. pub(crate) fn last_value(&self) -> (impl Measure, impl ComputeAggregation) { - let lv = Arc::new(LastValue::new()); - let agg_lv = Arc::clone(&lv); - let t = self.temporality; - - ( - self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_lv.delta(dest), - _ => agg_lv.cumulative(dest), - }, - ) + let lv = Arc::new(LastValue::new(self.temporality, self.filter.clone())); + (lv.clone(), lv) } /// Builds a precomputed sum aggregate function input and output. @@ -158,32 +154,20 @@ impl AggregateBuilder { &self, monotonic: bool, ) -> (impl Measure, impl ComputeAggregation) { - let s = Arc::new(PrecomputedSum::new(monotonic)); - let agg_sum = Arc::clone(&s); - let t = self.temporality; - - ( - self.filter(move |n, a: &[KeyValue]| s.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_sum.delta(dest), - _ => agg_sum.cumulative(dest), - }, - ) + let s = Arc::new(PrecomputedSum::new( + self.temporality, + self.filter.clone(), + monotonic, + )); + + (s.clone(), s) } /// Builds a sum aggregate function input and output. pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure, impl ComputeAggregation) { - let s = Arc::new(Sum::new(monotonic)); - let agg_sum = Arc::clone(&s); - let t = self.temporality; - - ( - self.filter(move |n, a: &[KeyValue]| s.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_sum.delta(dest), - _ => agg_sum.cumulative(dest), - }, - ) + let s = Arc::new(Sum::new(self.temporality, self.filter.clone(), monotonic)); + + (s.clone(), s) } /// Builds a histogram aggregate function input and output. @@ -193,17 +177,15 @@ impl AggregateBuilder { record_min_max: bool, record_sum: bool, ) -> (impl Measure, impl ComputeAggregation) { - let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum)); - let agg_h = Arc::clone(&h); - let t = self.temporality; - - ( - self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_h.delta(dest), - _ => agg_h.cumulative(dest), - }, - ) + let h = Arc::new(Histogram::new( + self.temporality, + self.filter.clone(), + boundaries, + record_min_max, + record_sum, + )); + + (h.clone(), h) } /// Builds an exponential histogram aggregate function input and output. @@ -215,21 +197,15 @@ impl AggregateBuilder { record_sum: bool, ) -> (impl Measure, impl ComputeAggregation) { let h = Arc::new(ExpoHistogram::new( + self.temporality, + self.filter.clone(), max_size, max_scale, record_min_max, record_sum, )); - let agg_h = Arc::clone(&h); - let t = self.temporality; - - ( - self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), - move |dest: Option<&mut dyn Aggregation>| match t { - Some(Temporality::Delta) => agg_h.delta(dest), - _ => agg_h.cumulative(dest), - }, - ) + + (h.clone(), h) } } @@ -245,7 +221,8 @@ mod tests { #[test] fn last_value_aggregation() { - let (measure, agg) = AggregateBuilder::::new(None, None).last_value(); + let (measure, agg) = + AggregateBuilder::::new(Temporality::Cumulative, None).last_value(); let mut a = Gauge { data_points: vec![GaugeDataPoint { attributes: vec![KeyValue::new("a", 1)], @@ -271,7 +248,7 @@ mod tests { fn precomputed_sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { let (measure, agg) = - AggregateBuilder::::new(Some(temporality), None).precomputed_sum(true); + AggregateBuilder::::new(temporality, None).precomputed_sum(true); let mut a = Sum { data_points: vec![ SumDataPoint { @@ -312,7 +289,7 @@ mod tests { #[test] fn sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(Some(temporality), None).sum(true); + let (measure, agg) = AggregateBuilder::::new(temporality, None).sum(true); let mut a = Sum { data_points: vec![ SumDataPoint { @@ -353,7 +330,7 @@ mod tests { #[test] fn explicit_bucket_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(Some(temporality), None) + let (measure, agg) = AggregateBuilder::::new(temporality, None) .explicit_bucket_histogram(vec![1.0], true, true); let mut a = Histogram { data_points: vec![HistogramDataPoint { @@ -396,7 +373,7 @@ mod tests { #[test] fn exponential_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(Some(temporality), None) + let (measure, agg) = AggregateBuilder::::new(temporality, None) .exponential_bucket_histogram(4, 20, true, true); let mut a = ExponentialHistogram { data_points: vec![ExponentialHistogramDataPoint { diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index 42f3794ad1..b54d78bace 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -1,4 +1,9 @@ -use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex}; +use std::{ + f64::consts::LOG2_E, + mem::replace, + ops::DerefMut, + sync::{Arc, Mutex}, +}; use opentelemetry::{otel_debug, KeyValue}; use std::sync::OnceLock; @@ -8,7 +13,10 @@ use crate::metrics::{ Temporality, }; -use super::{aggregate::AggregateTimeInitiator, Aggregator, Number, ValueMap}; +use super::{ + aggregate::{AggregateTimeInitiator, AttributeSetFilter}, + Aggregator, ComputeAggregation, Measure, Number, ValueMap, +}; pub(crate) const EXPO_MAX_SCALE: i8 = 20; pub(crate) const EXPO_MIN_SCALE: i8 = -10; @@ -351,6 +359,8 @@ struct BucketConfig { pub(crate) struct ExpoHistogram { value_map: ValueMap>>, init_time: AggregateTimeInitiator, + temporality: Temporality, + filter: AttributeSetFilter, record_sum: bool, record_min_max: bool, } @@ -358,6 +368,8 @@ pub(crate) struct ExpoHistogram { impl ExpoHistogram { /// Create a new exponential histogram. pub(crate) fn new( + temporality: Temporality, + filter: AttributeSetFilter, max_size: u32, max_scale: i8, record_min_max: bool, @@ -368,27 +380,15 @@ impl ExpoHistogram { max_size: max_size as i32, max_scale, }), + init_time: AggregateTimeInitiator::default(), + temporality, + filter, record_sum, record_min_max, - init_time: AggregateTimeInitiator::default(), } } - pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) { - let f_value = value.into_float(); - // Ignore NaN and infinity. - // Only makes sense if T is f64, maybe this could be no-op for other cases? - if !f_value.is_finite() { - return; - } - - self.value_map.measure(value, attrs); - } - - pub(crate) fn delta( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { + fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { let time = self.init_time.delta(); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); @@ -442,7 +442,7 @@ impl ExpoHistogram { (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } - pub(crate) fn cumulative( + fn cumulative( &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { @@ -500,6 +500,36 @@ impl ExpoHistogram { } } +impl Measure for Arc> +where + T: Number, +{ + fn call(&self, measurement: T, attrs: &[KeyValue]) { + let f_value = measurement.into_float(); + // Ignore NaN and infinity. + // Only makes sense if T is f64, maybe this could be no-op for other cases? + if !f_value.is_finite() { + return; + } + + self.filter.apply(attrs, |filtered| { + self.value_map.measure(measurement, filtered); + }) + } +} + +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} + #[cfg(test)] mod tests { use std::{ops::Neg, time::SystemTime}; @@ -665,9 +695,16 @@ mod tests { ]; for test in test_cases { - let h = ExpoHistogram::new(4, 20, true, true); + let h = Arc::new(ExpoHistogram::new( + Temporality::Cumulative, + AttributeSetFilter::new(None), + 4, + 20, + true, + true, + )); for v in test.values { - h.measure(v, &[]); + Measure::call(&h, v, &[]); } let dp = h.value_map.no_attribute_tracker.lock().unwrap(); @@ -714,9 +751,16 @@ mod tests { ]; for test in test_cases { - let h = ExpoHistogram::new(4, 20, true, true); + let h = Arc::new(ExpoHistogram::new( + Temporality::Cumulative, + AttributeSetFilter::new(None), + 4, + 20, + true, + true, + )); for v in test.values { - h.measure(v, &[]); + Measure::call(&h, v, &[]); } let dp = h.value_map.no_attribute_tracker.lock().unwrap(); @@ -1241,7 +1285,7 @@ mod tests { name: "Delta Single", build: Box::new(move || { box_val( - AggregateBuilder::new(Some(Temporality::Delta), None) + AggregateBuilder::new(Temporality::Delta, None) .exponential_bucket_histogram( max_size, max_scale, @@ -1284,7 +1328,7 @@ mod tests { name: "Cumulative Single", build: Box::new(move || { box_val( - internal::AggregateBuilder::new(Some(Temporality::Cumulative), None) + internal::AggregateBuilder::new(Temporality::Cumulative, None) .exponential_bucket_histogram( max_size, max_scale, @@ -1327,7 +1371,7 @@ mod tests { name: "Delta Multiple", build: Box::new(move || { box_val( - internal::AggregateBuilder::new(Some(Temporality::Delta), None) + internal::AggregateBuilder::new(Temporality::Delta, None) .exponential_bucket_histogram( max_size, max_scale, @@ -1373,7 +1417,7 @@ mod tests { name: "Cumulative Multiple ", build: Box::new(move || { box_val( - internal::AggregateBuilder::new(Some(Temporality::Cumulative), None) + internal::AggregateBuilder::new(Temporality::Cumulative, None) .exponential_bucket_histogram( max_size, max_scale, diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index f535566ecf..c79f9e4899 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,5 +1,6 @@ use std::mem::replace; use std::ops::DerefMut; +use std::sync::Arc; use std::sync::Mutex; use crate::metrics::data::HistogramDataPoint; @@ -8,6 +9,9 @@ use crate::metrics::Temporality; use opentelemetry::KeyValue; use super::aggregate::AggregateTimeInitiator; +use super::aggregate::AttributeSetFilter; +use super::ComputeAggregation; +use super::Measure; use super::ValueMap; use super::{Aggregator, Number}; @@ -68,15 +72,23 @@ impl Buckets { /// buckets. pub(crate) struct Histogram { value_map: ValueMap>>, + init_time: AggregateTimeInitiator, + temporality: Temporality, + filter: AttributeSetFilter, bounds: Vec, record_min_max: bool, record_sum: bool, - init_time: AggregateTimeInitiator, } impl Histogram { #[allow(unused_mut)] - pub(crate) fn new(mut bounds: Vec, record_min_max: bool, record_sum: bool) -> Self { + pub(crate) fn new( + temporality: Temporality, + filter: AttributeSetFilter, + mut bounds: Vec, + record_min_max: bool, + record_sum: bool, + ) -> Self { #[cfg(feature = "spec_unstable_metrics_views")] { // TODO: When views are used, validate this upfront @@ -87,30 +99,18 @@ impl Histogram { let buckets_count = bounds.len() + 1; Histogram { value_map: ValueMap::new(buckets_count), + init_time: AggregateTimeInitiator::default(), + temporality, + filter, bounds, record_min_max, record_sum, - init_time: AggregateTimeInitiator::default(), } } - pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - let f = measurement.into_float(); - // This search will return an index in the range `[0, bounds.len()]`, where - // it will return `bounds.len()` if value is greater than the last element - // of `bounds`. This aligns with the buckets in that the length of buckets - // is `bounds.len()+1`, with the last bucket representing: - // `(bounds[bounds.len()-1], +∞)`. - let index = self.bounds.partition_point(|&x| x < f); - - self.value_map.measure((measurement, index), attrs); - } - - pub(crate) fn delta( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { + fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { let time = self.init_time.delta(); + let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { @@ -157,7 +157,7 @@ impl Histogram { (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } - pub(crate) fn cumulative( + fn cumulative( &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { @@ -209,17 +209,54 @@ impl Histogram { } } +impl Measure for Arc> +where + T: Number, +{ + fn call(&self, measurement: T, attrs: &[KeyValue]) { + let f = measurement.into_float(); + // This search will return an index in the range `[0, bounds.len()]`, where + // it will return `bounds.len()` if value is greater than the last element + // of `bounds`. This aligns with the buckets in that the length of buckets + // is `bounds.len()+1`, with the last bucket representing: + // `(bounds[bounds.len()-1], +∞)`. + let index = self.bounds.partition_point(|&x| x < f); + + self.filter.apply(attrs, |filtered| { + self.value_map.measure((measurement, index), filtered); + }) + } +} + +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} + #[cfg(test)] mod tests { use super::*; #[test] fn check_buckets_are_selected_correctly() { - let hist = Histogram::::new(vec![1.0, 3.0, 6.0], false, false); + let hist = Arc::new(Histogram::::new( + Temporality::Cumulative, + AttributeSetFilter::new(None), + vec![1.0, 3.0, 6.0], + false, + false, + )); for v in 1..11 { - hist.measure(v, &[]); + Measure::call(&hist, v, &[]); } - let (count, dp) = hist.cumulative(None); + let (count, dp) = ComputeAggregation::call(&hist, None); let dp = dp.unwrap(); let dp = dp.as_any().downcast_ref::>().unwrap(); assert_eq!(count, 1); diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index b140fdbc40..8fbd92f44a 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,9 +1,14 @@ -use crate::metrics::data::{self, Aggregation, GaugeDataPoint}; +use std::sync::Arc; + +use crate::metrics::{ + data::{self, Aggregation, GaugeDataPoint}, + Temporality, +}; use opentelemetry::KeyValue; use super::{ - aggregate::AggregateTimeInitiator, Aggregator, AtomicTracker, AtomicallyUpdate, Number, - ValueMap, + aggregate::{AggregateTimeInitiator, AttributeSetFilter}, + Aggregator, AtomicTracker, AtomicallyUpdate, ComputeAggregation, Measure, Number, ValueMap, }; /// this is reused by PrecomputedSum @@ -42,21 +47,20 @@ where pub(crate) struct LastValue { value_map: ValueMap>, init_time: AggregateTimeInitiator, + temporality: Temporality, + filter: AttributeSetFilter, } impl LastValue { - pub(crate) fn new() -> Self { + pub(crate) fn new(temporality: Temporality, filter: AttributeSetFilter) -> Self { LastValue { value_map: ValueMap::new(()), init_time: AggregateTimeInitiator::default(), + temporality, + filter, } } - pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - // The argument index is not applicable to LastValue. - self.value_map.measure(measurement, attrs); - } - pub(crate) fn delta( &self, dest: Option<&mut dyn Aggregation>, @@ -123,3 +127,26 @@ impl LastValue { ) } } + +impl Measure for Arc> +where + T: Number, +{ + fn call(&self, measurement: T, attrs: &[KeyValue]) { + self.filter.apply(attrs, |filtered| { + self.value_map.measure(measurement, filtered); + }) + } +} + +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 7b67147011..6421d85f94 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -3,33 +3,38 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, SumDataPoint}; use crate::metrics::Temporality; -use super::aggregate::AggregateTimeInitiator; +use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter}; use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; +use super::{ComputeAggregation, Measure}; +use std::sync::Arc; use std::{collections::HashMap, sync::Mutex}; /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { value_map: ValueMap>, - monotonic: bool, init_time: AggregateTimeInitiator, + temporality: Temporality, + filter: AttributeSetFilter, + monotonic: bool, reported: Mutex, T>>, } impl PrecomputedSum { - pub(crate) fn new(monotonic: bool) -> Self { + pub(crate) fn new( + temporality: Temporality, + filter: AttributeSetFilter, + monotonic: bool, + ) -> Self { PrecomputedSum { value_map: ValueMap::new(()), - monotonic, init_time: AggregateTimeInitiator::default(), + temporality, + filter, + monotonic, reported: Mutex::new(Default::default()), } } - pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - // The argument index is not applicable to PrecomputedSum. - self.value_map.measure(measurement, attrs); - } - pub(crate) fn delta( &self, dest: Option<&mut dyn Aggregation>, @@ -118,3 +123,26 @@ impl PrecomputedSum { ) } } + +impl Measure for Arc> +where + T: Number, +{ + fn call(&self, measurement: T, attrs: &[KeyValue]) { + self.filter.apply(attrs, |filtered| { + self.value_map.measure(measurement, filtered); + }) + } +} + +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 5f51be79c2..7de2f7d2b5 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,11 +1,12 @@ +use std::sync::Arc; use std::vec; use crate::metrics::data::{self, Aggregation, SumDataPoint}; use crate::metrics::Temporality; use opentelemetry::KeyValue; -use super::aggregate::AggregateTimeInitiator; -use super::{Aggregator, AtomicTracker, Number}; +use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter}; +use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number}; use super::{AtomicallyUpdate, ValueMap}; struct Increment @@ -42,8 +43,10 @@ where /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum { value_map: ValueMap>, - monotonic: bool, init_time: AggregateTimeInitiator, + temporality: Temporality, + filter: AttributeSetFilter, + monotonic: bool, } impl Sum { @@ -52,19 +55,20 @@ impl Sum { /// /// Each sum is scoped by attributes and the aggregation cycle the measurements /// were made in. - pub(crate) fn new(monotonic: bool) -> Self { + pub(crate) fn new( + temporality: Temporality, + filter: AttributeSetFilter, + monotonic: bool, + ) -> Self { Sum { value_map: ValueMap::new(()), - monotonic, init_time: AggregateTimeInitiator::default(), + temporality, + filter, + monotonic, } } - pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - // The argument index is not applicable to Sum. - self.value_map.measure(measurement, attrs); - } - pub(crate) fn delta( &self, dest: Option<&mut dyn Aggregation>, @@ -138,3 +142,26 @@ impl Sum { ) } } + +impl Measure for Arc> +where + T: Number, +{ + fn call(&self, measurement: T, attrs: &[KeyValue]) { + self.filter.apply(attrs, |filtered| { + self.value_map.measure(measurement, filtered); + }) + } +} + +impl ComputeAggregation for Arc> +where + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + match self.temporality { + Temporality::Delta => self.delta(dest), + _ => self.cumulative(dest), + } + } +} diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 292ab0b8f2..43bfd0912e 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -782,4 +782,134 @@ mod tests { "Metrics should be available in exporter." ); } + + async fn some_async_function() -> u64 { + // No dependency on any particular async runtime. + std::thread::sleep(std::time::Duration::from_millis(1)); + 1 + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() { + async_inside_observable_callback_helper(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() { + async_inside_observable_callback_helper(); + } + + #[tokio::test(flavor = "current_thread")] + async fn async_inside_observable_callback_from_tokio_current_thread() { + async_inside_observable_callback_helper(); + } + + #[test] + fn async_inside_observable_callback_from_regular_main() { + async_inside_observable_callback_helper(); + } + + fn async_inside_observable_callback_helper() { + let interval = std::time::Duration::from_millis(10); + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let _gauge = meter + .u64_observable_gauge("my_observable_gauge") + .with_callback(|observer| { + // using futures_executor::block_on intentionally and avoiding + // any particular async runtime. + let value = futures_executor::block_on(some_async_function()); + observer.observe(value, &[]); + }) + .build(); + + meter_provider.force_flush().expect("flush should succeed"); + let exported_metrics = exporter + .get_finished_metrics() + .expect("this should not fail"); + assert!( + !exported_metrics.is_empty(), + "Metrics should be available in exporter." + ); + } + + async fn some_tokio_async_function() -> u64 { + // Tokio specific async function + tokio::time::sleep(Duration::from_millis(1)).await; + 1 + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + + async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() { + tokio_async_inside_observable_callback_helper(true); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() { + tokio_async_inside_observable_callback_helper(true); + } + + #[tokio::test(flavor = "current_thread")] + #[ignore] //TODO: Investigate if this can be fixed. + async fn tokio_async_inside_observable_callback_from_tokio_current_thread() { + tokio_async_inside_observable_callback_helper(true); + } + + #[test] + fn tokio_async_inside_observable_callback_from_regular_main() { + tokio_async_inside_observable_callback_helper(false); + } + + fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) { + let interval = std::time::Duration::from_millis(10); + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()) + .with_interval(interval) + .build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + + if use_current_tokio_runtime { + let rt = tokio::runtime::Handle::current().clone(); + let _gauge = meter + .u64_observable_gauge("my_observable_gauge") + .with_callback(move |observer| { + // call tokio specific async function from here + let value = rt.block_on(some_tokio_async_function()); + observer.observe(value, &[]); + }) + .build(); + // rt here is a reference to the current tokio runtime. + // Droppng it occurs when the tokio::main itself ends. + } else { + let rt = tokio::runtime::Runtime::new().unwrap(); + let _gauge = meter + .u64_observable_gauge("my_observable_gauge") + .with_callback(move |observer| { + // call tokio specific async function from here + let value = rt.block_on(some_tokio_async_function()); + observer.observe(value, &[]); + }) + .build(); + // rt is not dropped here as it is moved to the closure, + // and is dropped only when MeterProvider itself is dropped. + // This works when called from normal main. + }; + + meter_provider.force_flush().expect("flush should succeed"); + let exported_metrics = exporter + .get_finished_metrics() + .expect("this should not fail"); + assert!( + !exported_metrics.is_empty(), + "Metrics should be available in exporter." + ); + } } diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 8fb94c289f..5ba4bba75f 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -384,7 +384,7 @@ where .clone() .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>); - let b = AggregateBuilder::new(Some(self.pipeline.reader.temporality(kind)), filter); + let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter); let (m, ca) = match aggregate_fn(b, &agg, kind) { Ok(Some((m, ca))) => (m, ca), other => return other.map(|fs| fs.map(|(m, _)| m)), // Drop aggregator or error diff --git a/opentelemetry-stdout/Cargo.toml b/opentelemetry-stdout/Cargo.toml index 40221a9d41..7061a5831b 100644 --- a/opentelemetry-stdout/Cargo.toml +++ b/opentelemetry-stdout/Cargo.toml @@ -34,7 +34,6 @@ futures-util = { workspace = true, optional = true } opentelemetry = { version = "0.27", path = "../opentelemetry" } opentelemetry_sdk = { version = "0.27", path = "../opentelemetry-sdk" } serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true } ordered-float = { workspace = true } [dev-dependencies] diff --git a/scripts/integration_tests.sh b/scripts/integration_tests.sh index 07b9d472ba..b984cc023f 100755 --- a/scripts/integration_tests.sh +++ b/scripts/integration_tests.sh @@ -1,19 +1,36 @@ set -e + TEST_DIR="./opentelemetry-otlp/tests/integration_test/tests" if [ -d "$TEST_DIR" ]; then cd "$TEST_DIR" + # Run tests with the grpc-tonic feature - cargo test --no-default-features --features "tonic-client" -- --ignored + echo + echo #### + echo Integration Tests: gRPC Tonic Client + echo #### + echo + cargo test --no-default-features --features "tonic-client","internal-logs" # Run tests with the reqwest-client feature - cargo test --no-default-features --features "reqwest-client" -- --ignored + echo + echo #### + echo Integration Tests: Reqwest Client + echo #### + echo + cargo test --no-default-features --features "reqwest-client","internal-logs" # TODO - Uncomment the following lines once the reqwest-blocking-client feature is working. - # cargo test --no-default-features --features "reqwest-blocking-client" -- --ignored + # cargo test --no-default-features --features "reqwest-blocking-client" # Run tests with the hyper-client feature - cargo test --no-default-features --features "hyper-client" -- --ignored + echo + echo #### + echo Integration Tests: Hyper Client + echo #### + echo + cargo test --no-default-features --features "hyper-client","internal-logs" else echo "Directory $TEST_DIR does not exist. Skipping tests." exit 1 diff --git a/scripts/test.sh b/scripts/test.sh index dfcb925659..467d5f7c4a 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -2,15 +2,19 @@ set -eu +# +# Using '--lib' skips integration tests +# + echo "Running tests for all packages in workspace with --all-features" -cargo test --workspace --all-features +cargo test --workspace --all-features --lib # See https://github.com/rust-lang/cargo/issues/5364 echo "Running tests for opentelemetry package with --no-default-features" -cargo test --manifest-path=opentelemetry/Cargo.toml --no-default-features +cargo test --manifest-path=opentelemetry/Cargo.toml --no-default-features --lib # Run global tracer provider test in single thread # //TODO: This tests were not running for a while. Need to find out how to run # run them. Using --ignored will run other tests as well, so that cannot be used. # echo "Running global tracer provider for opentelemetry-sdk package with single thread." -# cargo test --manifest-path=opentelemetry-sdk/Cargo.toml --all-features -- --test-threads=1 +# cargo test --manifest-path=opentelemetry-sdk/Cargo.toml --all-features -- --test-threads=1 --lib diff --git a/stress/Cargo.toml b/stress/Cargo.toml index e8ebe6cd37..b4b86ba330 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -51,7 +51,6 @@ tracing = { workspace = true, features = ["std"]} tracing-subscriber = { workspace = true, features = ["registry", "std"] } num-format = "0.4.4" sysinfo = { version = "0.32", optional = true } -libc = "0.2" async-trait = "0.1.51" futures-executor = { workspace = true }