diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index 7e898b6977fe..8586d79f779c 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -232,13 +232,13 @@ fn encode_sum( Ok(()) } -const HISTOGRAM_LE_COLUMN: &str = "le"; +const HISTOGRAM_LE_COLUMN: &str = "greptime_le"; /// Encode histogram data. This function returns 3 insert requests for 3 tables. /// /// The implementation has been following Prometheus histogram table format: /// -/// - A `%metric%_bucket` table including `le` tag that stores bucket upper +/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper /// limit, and `greptime_value` for bucket count /// - A `%metric%_sum` table storing sum of samples /// - A `%metric%_count` table storing count of samples. diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs index 437d8256b49f..71a85d68606a 100644 --- a/tests-integration/src/otlp.rs +++ b/tests-integration/src/otlp.rs @@ -87,6 +87,66 @@ mod test { +------------+-------+--------------------+------------+---------------------+----------------+ | greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 105.0 | | greptimedb | otel | java | testsevrer | 1970-01-01T00:00:00 | 100.0 | ++------------+-------+--------------------+------------+---------------------+----------------+", + ); + + let mut output = instance + .do_query( + "SELECT greptime_le, greptime_value FROM my_test_histo_bucket order by greptime_le", + ctx.clone(), + ) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { + unreachable!() + }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + dbg!(&recordbatches); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++-------------+----------------+ +| greptime_le | greptime_value | ++-------------+----------------+ +| 1 | 1.0 | +| 5 | 3.0 | +| inf | 4.0 | ++-------------+----------------+", + ); + + let mut output = instance + .do_query("SELECT * FROM my_test_histo_sum", ctx.clone()) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { + unreachable!() + }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++------------+-------+--------------------+------------+---------------------+----------------+ +| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | ++------------+-------+--------------------+------------+---------------------+----------------+ +| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 51.0 | ++------------+-------+--------------------+------------+---------------------+----------------+", + ); + + let mut output = instance + .do_query("SELECT * FROM my_test_histo_count", ctx.clone()) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { + unreachable!() + }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++------------+-------+--------------------+------------+---------------------+----------------+ +| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | ++------------+-------+--------------------+------------+---------------------+----------------+ +| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 4.0 | +------------+-------+--------------------+------------+---------------------+----------------+", ); } @@ -108,15 +168,38 @@ mod test { ]; let gauge = Gauge { data_points }; + let histo_data_points = vec![HistogramDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 100, + count: 4, + bucket_counts: vec![1, 2, 1], + explicit_bounds: vec![1.0f64, 5.0f64], + sum: Some(51f64), + ..Default::default() + }]; + + let histo = Histogram { + data_points: histo_data_points, + aggregation_temporality: 0, + }; + ExportMetricsServiceRequest { resource_metrics: vec![ResourceMetrics { scope_metrics: vec![ScopeMetrics { - metrics: vec![Metric { - name: "my.test.metric".into(), - description: "my ignored desc".into(), - unit: "my ignored unit".into(), - data: Some(metric::Data::Gauge(gauge)), - }], + metrics: vec![ + Metric { + name: "my.test.metric".into(), + description: "my ignored desc".into(), + unit: "my ignored unit".into(), + data: Some(metric::Data::Gauge(gauge)), + }, + Metric { + name: "my.test.histo".into(), + description: "my ignored desc".into(), + unit: "my ignored unit".into(), + data: Some(metric::Data::Histogram(histo)), + }, + ], scope: Some(InstrumentationScope { attributes: vec![ keyvalue("scope", "otel"),