diff --git a/Cargo.toml b/Cargo.toml index 49f275710d4b..a8c675fbbfa2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = metrics = "0.20" moka = "0.12" once_cell = "1.18" -opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] } +opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics", "traces"] } parquet = "43.0" paste = "1.0" prost = "0.11" diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 29e0cfc44657..e5927399385b 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -19,14 +19,18 @@ use metrics::counter; use opentelemetry_proto::tonic::collector::metrics::v1::{ ExportMetricsServiceRequest, ExportMetricsServiceResponse, }; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, +}; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::otlp; +use servers::otlp::plugin::TraceParserRef; use servers::query_handler::OpenTelemetryProtocolHandler; use session::context::QueryContextRef; use snafu::ResultExt; use crate::instance::Instance; -use crate::metrics::OTLP_METRICS_ROWS; +use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; #[async_trait] impl OpenTelemetryProtocolHandler for Instance { @@ -40,7 +44,7 @@ impl OpenTelemetryProtocolHandler for Instance { .as_ref() .check_permission(ctx.current_user(), PermissionReq::Otlp) .context(AuthSnafu)?; - let (requests, rows) = otlp::to_grpc_insert_requests(request)?; + let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?; let _ = self .handle_row_inserts(requests, ctx) .await @@ -55,4 +59,40 @@ impl OpenTelemetryProtocolHandler for Instance { }; Ok(resp) } + + async fn traces( + &self, + request: ExportTraceServiceRequest, + ctx: QueryContextRef, + ) -> ServerResult { + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::Otlp) + .context(AuthSnafu)?; + + let (table_name, spans) = match self.plugins.get::() { + Some(parser) => (parser.table_name(), parser.parse(request)), + None => ( + otlp::trace::TRACE_TABLE_NAME.to_string(), + otlp::trace::parse(request), + ), + }; + + let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?; + + let _ = self + .handle_row_inserts(requests, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + + counter!(OTLP_TRACES_ROWS, rows as u64); + + let resp = ExportTraceServiceResponse { + // TODO(fys): add support for partial_success in future patch + partial_success: None, + }; + Ok(resp) + } } diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 8a7480f9ba1b..b07bf2df9eb5 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -22,3 +22,4 @@ pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed" pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples"; pub const OTLP_METRICS_ROWS: &str = "frontend.otlp.metrics.rows"; +pub const OTLP_TRACES_ROWS: &str = "frontend.otlp.traces.rows"; diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index f00d4c07a271..884f79d48d85 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -660,6 +660,7 @@ impl HttpServer { fn route_otlp(&self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router { Router::new() .route("/v1/metrics", routing::post(otlp::metrics)) + .route("/v1/traces", routing::post(otlp::traces)) .with_state(otlp_handler) } diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 7d797d440fbf..b4ae4ea85473 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -21,6 +21,9 @@ use hyper::Body; use opentelemetry_proto::tonic::collector::metrics::v1::{ ExportMetricsServiceRequest, ExportMetricsServiceResponse, }; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, +}; use prost::Message; use session::context::QueryContextRef; use snafu::prelude::*; @@ -33,16 +36,19 @@ pub async fn metrics( State(handler): State, Extension(query_ctx): Extension, RawBody(body): RawBody, -) -> Result { +) -> Result { let _timer = timer!( - crate::metrics::METRIC_HTTP_OPENTELEMETRY_ELAPSED, + crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED, &[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())] ); - let request = parse_body(body).await?; - handler.metrics(request, query_ctx).await.map(OtlpResponse) + let request = parse_metrics_body(body).await?; + handler + .metrics(request, query_ctx) + .await + .map(OtlpMetricsResponse) } -async fn parse_body(body: Body) -> Result { +async fn parse_metrics_body(body: Body) -> Result { hyper::body::to_bytes(body) .await .context(error::HyperSnafu) @@ -51,9 +57,47 @@ async fn parse_body(body: Body) -> Result { }) } -pub struct OtlpResponse(ExportMetricsServiceResponse); +pub struct OtlpMetricsResponse(ExportMetricsServiceResponse); + +impl IntoResponse for OtlpMetricsResponse { + fn into_response(self) -> axum::response::Response { + ( + [(header::CONTENT_TYPE, "application/x-protobuf")], + self.0.encode_to_vec(), + ) + .into_response() + } +} + +#[axum_macros::debug_handler] +pub async fn traces( + State(handler): State, + Extension(query_ctx): Extension, + RawBody(body): RawBody, +) -> Result { + let _timer = timer!( + crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED, + &[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())] + ); + let request = parse_traces_body(body).await?; + handler + .traces(request, query_ctx) + .await + .map(OtlpTracesResponse) +} + +async fn parse_traces_body(body: Body) -> Result { + hyper::body::to_bytes(body) + .await + .context(error::HyperSnafu) + .and_then(|buf| { + ExportTraceServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu) + }) +} + +pub struct OtlpTracesResponse(ExportTraceServiceResponse); -impl IntoResponse for OtlpResponse { +impl IntoResponse for OtlpTracesResponse { fn into_response(self) -> axum::response::Response { ( [(header::CONTENT_TYPE, "application/x-protobuf")], diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index d9e708cfcf31..6e9aee8ad1e4 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -37,7 +37,10 @@ pub(crate) const METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: &str = "servers.http_influx pub(crate) const METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: &str = "servers.http_prometheus_write_elapsed"; pub(crate) const METRIC_HTTP_PROM_STORE_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed"; -pub(crate) const METRIC_HTTP_OPENTELEMETRY_ELAPSED: &str = "servers.http_otlp_elapsed"; +pub(crate) const METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED: &str = + "servers.http_otlp_metrics_elapsed"; +pub(crate) const METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED: &str = + "servers.http_otlp_traces_elapsed"; pub(crate) const METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: &str = "servers.opentsdb_line_write_elapsed"; pub(crate) const METRIC_HTTP_PROMQL_INSTANT_QUERY_ELAPSED: &str = diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index 3acfaf0a881b..0f35a7b39ef0 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -12,649 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::{RowInsertRequests, Value}; -use common_grpc::writer::Precision; -use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; -use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue}; -use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *}; - -use crate::error::Result; -use crate::row_writer::{self, MultiTableData, TableData}; +pub mod metrics; +pub mod plugin; +pub mod trace; const GREPTIME_TIMESTAMP: &str = "greptime_timestamp"; const GREPTIME_VALUE: &str = "greptime_value"; const GREPTIME_COUNT: &str = "greptime_count"; -/// the default column count for table writer -const APPROXIMATE_COLUMN_COUNT: usize = 8; - -/// Normalize otlp instrumentation, metric and attribute names -/// -/// -/// - since the name are case-insensitive, we transform them to lowercase for -/// better sql usability -/// - replace `.` and `-` with `_` -fn normalize_otlp_name(name: &str) -> String { - name.to_lowercase().replace(|c| c == '.' || c == '-', "_") -} - -/// Convert OpenTelemetry metrics to GreptimeDB insert requests -/// -/// See -/// -/// for data structure of OTLP metrics. -/// -/// Returns `InsertRequests` and total number of rows to ingest -pub fn to_grpc_insert_requests( - request: ExportMetricsServiceRequest, -) -> Result<(RowInsertRequests, usize)> { - let mut table_writer = MultiTableData::default(); - - for resource in &request.resource_metrics { - let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes); - for scope in &resource.scope_metrics { - let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes); - for metric in &scope.metrics { - encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?; - } - } - } - - Ok(table_writer.into_row_insert_requests()) -} - -fn encode_metrics( - table_writer: &mut MultiTableData, - metric: &Metric, - resource_attrs: Option<&Vec>, - scope_attrs: Option<&Vec>, -) -> Result<()> { - let name = &metric.name; - // note that we don't store description or unit, we might want to deal with - // these fields in the future. - if let Some(data) = &metric.data { - match data { - metric::Data::Gauge(gauge) => { - encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?; - } - metric::Data::Sum(sum) => { - encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?; - } - metric::Data::Summary(summary) => { - encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?; - } - metric::Data::Histogram(hist) => { - encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?; - } - // TODO(sunng87) leave ExponentialHistogram for next release - metric::Data::ExponentialHistogram(_hist) => {} - } - } - - Ok(()) -} - -fn write_attributes( - writer: &mut TableData, - row: &mut Vec, - attrs: Option<&Vec>, -) -> Result<()> { - if let Some(attrs) = attrs { - let table_tags = attrs.iter().filter_map(|attr| { - if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) { - let key = normalize_otlp_name(&attr.key); - match val { - any_value::Value::StringValue(s) => Some((key, s.to_string())), - any_value::Value::IntValue(v) => Some((key, v.to_string())), - any_value::Value::DoubleValue(v) => Some((key, v.to_string())), - _ => None, // TODO(sunng87): allow different type of values - } - } else { - None - } - }); - - row_writer::write_tags(writer, table_tags, row)?; - } - Ok(()) -} - -fn write_timestamp(table: &mut TableData, row: &mut Vec, time_nano: i64) -> Result<()> { - row_writer::write_ts_precision( - table, - GREPTIME_TIMESTAMP, - Some(time_nano), - Precision::Nanosecond, - row, - ) -} - -fn write_data_point_value( - table: &mut TableData, - row: &mut Vec, - field: &str, - value: &Option, -) -> Result<()> { - match value { - Some(number_data_point::Value::AsInt(val)) => { - // we coerce all values to f64 - row_writer::write_f64(table, field, *val as f64, row)?; - } - Some(number_data_point::Value::AsDouble(val)) => { - row_writer::write_f64(table, field, *val, row)?; - } - _ => {} - } - Ok(()) -} - -fn write_tags_and_timestamp( - table: &mut TableData, - row: &mut Vec, - resource_attrs: Option<&Vec>, - scope_attrs: Option<&Vec>, - data_point_attrs: Option<&Vec>, - timestamp_nanos: i64, -) -> Result<()> { - write_attributes(table, row, resource_attrs)?; - write_attributes(table, row, scope_attrs)?; - write_attributes(table, row, data_point_attrs)?; - - write_timestamp(table, row, timestamp_nanos)?; - - Ok(()) -} - -/// encode this gauge metric -/// -/// note that there can be multiple data points in the request, it's going to be -/// stored as multiple rows -fn encode_gauge( - table_writer: &mut MultiTableData, - name: &str, - gauge: &Gauge, - resource_attrs: Option<&Vec>, - scope_attrs: Option<&Vec>, -) -> Result<()> { - let table = table_writer.get_or_default_table_data( - &normalize_otlp_name(name), - APPROXIMATE_COLUMN_COUNT, - gauge.data_points.len(), - ); - - for data_point in &gauge.data_points { - let mut row = table.alloc_one_row(); - write_tags_and_timestamp( - table, - &mut row, - resource_attrs, - scope_attrs, - Some(data_point.attributes.as_ref()), - data_point.time_unix_nano as i64, - )?; - - write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; - table.add_row(row); - } - - Ok(()) -} - -/// encode this sum metric -/// -/// `aggregation_temporality` and `monotonic` are ignored for now -fn encode_sum( - table_writer: &mut MultiTableData, - name: &str, - sum: &Sum, - resource_attrs: Option<&Vec>, - scope_attrs: Option<&Vec>, -) -> Result<()> { - let table = table_writer.get_or_default_table_data( - &normalize_otlp_name(name), - APPROXIMATE_COLUMN_COUNT, - sum.data_points.len(), - ); - - for data_point in &sum.data_points { - let mut row = table.alloc_one_row(); - write_tags_and_timestamp( - table, - &mut row, - resource_attrs, - scope_attrs, - Some(data_point.attributes.as_ref()), - data_point.time_unix_nano as i64, - )?; - write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; - table.add_row(row); - } - - Ok(()) -} - -const HISTOGRAM_LE_COLUMN: &str = "le"; - -/// Encode histogram data. This function returns 3 insert requests for 3 tables. -/// -/// The implementation has been following Prometheus histogram table format: -/// -/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper -/// limit, and `greptime_value` for bucket count -/// - A `%metric%_sum` table storing sum of samples -/// - A `%metric%_count` table storing count of samples. -/// -/// By its Prometheus compatibility, we hope to be able to use prometheus -/// quantile functions on this table. -fn encode_histogram( - table_writer: &mut MultiTableData, - name: &str, - hist: &Histogram, - resource_attrs: Option<&Vec>, - scope_attrs: Option<&Vec>, -) -> Result<()> { - let normalized_name = normalize_otlp_name(name); - - let bucket_table_name = format!("{}_bucket", normalized_name); - let sum_table_name = format!("{}_sum", normalized_name); - let count_table_name = format!("{}_count", normalized_name); - - let data_points_len = hist.data_points.len(); - // Note that the row and columns number here is approximate - let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3); - let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len); - let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len); - - for data_point in &hist.data_points { - let mut accumulated_count = 0; - for (idx, count) in data_point.bucket_counts.iter().enumerate() { - let mut bucket_row = bucket_table.alloc_one_row(); - write_tags_and_timestamp( - &mut bucket_table, - &mut bucket_row, - resource_attrs, - scope_attrs, - Some(data_point.attributes.as_ref()), - data_point.time_unix_nano as i64, - )?; - - if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) { - row_writer::write_tag( - &mut bucket_table, - HISTOGRAM_LE_COLUMN, - upper_bounds, - &mut bucket_row, - )?; - } else if idx == data_point.explicit_bounds.len() { - // The last bucket - row_writer::write_tag( - &mut bucket_table, - HISTOGRAM_LE_COLUMN, - f64::INFINITY, - &mut bucket_row, - )?; - } - - accumulated_count += count; - row_writer::write_f64( - &mut bucket_table, - GREPTIME_VALUE, - accumulated_count as f64, - &mut bucket_row, - )?; - - bucket_table.add_row(bucket_row); - } - - if let Some(sum) = data_point.sum { - let mut sum_row = sum_table.alloc_one_row(); - write_tags_and_timestamp( - &mut sum_table, - &mut sum_row, - resource_attrs, - scope_attrs, - Some(data_point.attributes.as_ref()), - data_point.time_unix_nano as i64, - )?; - - row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?; - sum_table.add_row(sum_row); - } - - let mut count_row = count_table.alloc_one_row(); - write_tags_and_timestamp( - &mut count_table, - &mut count_row, - resource_attrs, - scope_attrs, - Some(data_point.attributes.as_ref()), - data_point.time_unix_nano as i64, - )?; - - row_writer::write_f64( - &mut count_table, - GREPTIME_VALUE, - data_point.count as f64, - &mut count_row, - )?; - count_table.add_row(count_row); - } - - table_writer.add_table_data(bucket_table_name, bucket_table); - table_writer.add_table_data(sum_table_name, sum_table); - table_writer.add_table_data(count_table_name, count_table); - - Ok(()) -} - -#[allow(dead_code)] -fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> { - // TODO(sunng87): implement this using a prometheus compatible way - Ok(()) -} - -fn encode_summary( - table_writer: &mut MultiTableData, - name: &str, - summary: &Summary, - resource_attrs: Option<&Vec>, - scope_attrs: Option<&Vec>, -) -> Result<()> { - let table = table_writer.get_or_default_table_data( - &normalize_otlp_name(name), - APPROXIMATE_COLUMN_COUNT, - summary.data_points.len(), - ); - - for data_point in &summary.data_points { - let mut row = table.alloc_one_row(); - write_tags_and_timestamp( - table, - &mut row, - resource_attrs, - scope_attrs, - Some(data_point.attributes.as_ref()), - data_point.time_unix_nano as i64, - )?; - - for quantile in &data_point.quantile_values { - row_writer::write_f64( - table, - &format!("greptime_p{:02}", quantile.quantile * 100f64), - quantile.value, - &mut row, - )?; - } - - row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?; - table.add_row(row); - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use opentelemetry_proto::tonic::common::v1::any_value::Value as Val; - use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; - use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value; - use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile; - use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint}; - - use super::*; - - #[test] - fn test_normalize_otlp_name() { - assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free"); - assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free"); - assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free"); - assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free"); - assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free"); - } - - fn keyvalue(key: &str, value: &str) -> KeyValue { - KeyValue { - key: key.into(), - value: Some(AnyValue { - value: Some(Val::StringValue(value.into())), - }), - } - } - - #[test] - fn test_encode_gauge() { - let mut tables = MultiTableData::default(); - - let data_points = vec![ - NumberDataPoint { - attributes: vec![keyvalue("host", "testsevrer")], - time_unix_nano: 100, - value: Some(Value::AsInt(100)), - ..Default::default() - }, - NumberDataPoint { - attributes: vec![keyvalue("host", "testserver")], - time_unix_nano: 105, - value: Some(Value::AsInt(105)), - ..Default::default() - }, - ]; - let gauge = Gauge { data_points }; - encode_gauge( - &mut tables, - "datamon", - &gauge, - Some(&vec![keyvalue("resource", "app")]), - Some(&vec![keyvalue("scope", "otel")]), - ) - .unwrap(); - - let table = tables.get_or_default_table_data("datamon", 0, 0); - assert_eq!(table.num_rows(), 2); - assert_eq!(table.num_columns(), 5); - assert_eq!( - table - .columns() - .iter() - .map(|c| &c.column_name) - .collect::>(), - vec![ - "resource", - "scope", - "host", - "greptime_timestamp", - "greptime_value" - ] - ); - } - - #[test] - fn test_encode_sum() { - let mut tables = MultiTableData::default(); - - let data_points = vec![ - NumberDataPoint { - attributes: vec![keyvalue("host", "testserver")], - time_unix_nano: 100, - value: Some(Value::AsInt(100)), - ..Default::default() - }, - NumberDataPoint { - attributes: vec![keyvalue("host", "testserver")], - time_unix_nano: 105, - value: Some(Value::AsInt(0)), - ..Default::default() - }, - ]; - let sum = Sum { - data_points, - ..Default::default() - }; - encode_sum( - &mut tables, - "datamon", - &sum, - Some(&vec![keyvalue("resource", "app")]), - Some(&vec![keyvalue("scope", "otel")]), - ) - .unwrap(); - - let table = tables.get_or_default_table_data("datamon", 0, 0); - assert_eq!(table.num_rows(), 2); - assert_eq!(table.num_columns(), 5); - assert_eq!( - table - .columns() - .iter() - .map(|c| &c.column_name) - .collect::>(), - vec![ - "resource", - "scope", - "host", - "greptime_timestamp", - "greptime_value" - ] - ); - } - - #[test] - fn test_encode_summary() { - let mut tables = MultiTableData::default(); - - let data_points = vec![SummaryDataPoint { - attributes: vec![keyvalue("host", "testserver")], - time_unix_nano: 100, - count: 25, - sum: 5400.0, - quantile_values: vec![ - ValueAtQuantile { - quantile: 0.90, - value: 1000.0, - }, - ValueAtQuantile { - quantile: 0.95, - value: 3030.0, - }, - ], - ..Default::default() - }]; - let summary = Summary { data_points }; - encode_summary( - &mut tables, - "datamon", - &summary, - Some(&vec![keyvalue("resource", "app")]), - Some(&vec![keyvalue("scope", "otel")]), - ) - .unwrap(); - - let table = tables.get_or_default_table_data("datamon", 0, 0); - assert_eq!(table.num_rows(), 1); - assert_eq!(table.num_columns(), 7); - assert_eq!( - table - .columns() - .iter() - .map(|c| &c.column_name) - .collect::>(), - vec![ - "resource", - "scope", - "host", - "greptime_timestamp", - "greptime_p90", - "greptime_p95", - "greptime_count" - ] - ); - } - - #[test] - fn test_encode_histogram() { - let mut tables = MultiTableData::default(); - - let data_points = vec![HistogramDataPoint { - attributes: vec![keyvalue("host", "testserver")], - time_unix_nano: 100, - start_time_unix_nano: 23, - count: 25, - sum: Some(100.), - max: Some(200.), - min: Some(0.03), - bucket_counts: vec![2, 4, 6, 9, 4], - explicit_bounds: vec![0.1, 1., 10., 100.], - ..Default::default() - }]; - - let histogram = Histogram { - data_points, - aggregation_temporality: AggregationTemporality::Delta.into(), - }; - encode_histogram( - &mut tables, - "histo", - &histogram, - Some(&vec![keyvalue("resource", "app")]), - Some(&vec![keyvalue("scope", "otel")]), - ) - .unwrap(); - - assert_eq!(3, tables.num_tables()); - - // bucket table - let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0); - assert_eq!(bucket_table.num_rows(), 5); - assert_eq!(bucket_table.num_columns(), 6); - assert_eq!( - bucket_table - .columns() - .iter() - .map(|c| &c.column_name) - .collect::>(), - vec![ - "resource", - "scope", - "host", - "greptime_timestamp", - "le", - "greptime_value", - ] - ); - - let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0); - assert_eq!(sum_table.num_rows(), 1); - assert_eq!(sum_table.num_columns(), 5); - assert_eq!( - sum_table - .columns() - .iter() - .map(|c| &c.column_name) - .collect::>(), - vec![ - "resource", - "scope", - "host", - "greptime_timestamp", - "greptime_value", - ] - ); - - let count_table = tables.get_or_default_table_data("histo_count", 0, 0); - assert_eq!(count_table.num_rows(), 1); - assert_eq!(count_table.num_columns(), 5); - assert_eq!( - count_table - .columns() - .iter() - .map(|c| &c.column_name) - .collect::>(), - vec![ - "resource", - "scope", - "host", - "greptime_timestamp", - "greptime_value", - ] - ); - } -} diff --git a/src/servers/src/otlp/metrics.rs b/src/servers/src/otlp/metrics.rs new file mode 100644 index 000000000000..cd7bbc7db81a --- /dev/null +++ b/src/servers/src/otlp/metrics.rs @@ -0,0 +1,658 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::{RowInsertRequests, Value}; +use common_grpc::writer::Precision; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue}; +use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *}; + +use super::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use crate::error::Result; +use crate::row_writer::{self, MultiTableData, TableData}; + +/// the default column count for table writer +const APPROXIMATE_COLUMN_COUNT: usize = 8; + +/// Normalize otlp instrumentation, metric and attribute names +/// +/// +/// - since the name are case-insensitive, we transform them to lowercase for +/// better sql usability +/// - replace `.` and `-` with `_` +fn normalize_otlp_name(name: &str) -> String { + name.to_lowercase().replace(|c| c == '.' || c == '-', "_") +} + +/// Convert OpenTelemetry metrics to GreptimeDB insert requests +/// +/// See +/// +/// for data structure of OTLP metrics. +/// +/// Returns `InsertRequests` and total number of rows to ingest +pub fn to_grpc_insert_requests( + request: ExportMetricsServiceRequest, +) -> Result<(RowInsertRequests, usize)> { + let mut table_writer = MultiTableData::default(); + + for resource in &request.resource_metrics { + let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes); + for scope in &resource.scope_metrics { + let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes); + for metric in &scope.metrics { + encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?; + } + } + } + + Ok(table_writer.into_row_insert_requests()) +} + +fn encode_metrics( + table_writer: &mut MultiTableData, + metric: &Metric, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result<()> { + let name = &metric.name; + // note that we don't store description or unit, we might want to deal with + // these fields in the future. + if let Some(data) = &metric.data { + match data { + metric::Data::Gauge(gauge) => { + encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?; + } + metric::Data::Sum(sum) => { + encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?; + } + metric::Data::Summary(summary) => { + encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?; + } + metric::Data::Histogram(hist) => { + encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?; + } + // TODO(sunng87) leave ExponentialHistogram for next release + metric::Data::ExponentialHistogram(_hist) => {} + } + } + + Ok(()) +} + +fn write_attributes( + writer: &mut TableData, + row: &mut Vec, + attrs: Option<&Vec>, +) -> Result<()> { + if let Some(attrs) = attrs { + let table_tags = attrs.iter().filter_map(|attr| { + if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) { + let key = normalize_otlp_name(&attr.key); + match val { + any_value::Value::StringValue(s) => Some((key, s.to_string())), + any_value::Value::IntValue(v) => Some((key, v.to_string())), + any_value::Value::DoubleValue(v) => Some((key, v.to_string())), + _ => None, // TODO(sunng87): allow different type of values + } + } else { + None + } + }); + + row_writer::write_tags(writer, table_tags, row)?; + } + Ok(()) +} + +fn write_timestamp(table: &mut TableData, row: &mut Vec, time_nano: i64) -> Result<()> { + row_writer::write_ts_precision( + table, + GREPTIME_TIMESTAMP, + Some(time_nano), + Precision::Nanosecond, + row, + ) +} + +fn write_data_point_value( + table: &mut TableData, + row: &mut Vec, + field: &str, + value: &Option, +) -> Result<()> { + match value { + Some(number_data_point::Value::AsInt(val)) => { + // we coerce all values to f64 + row_writer::write_f64(table, field, *val as f64, row)?; + } + Some(number_data_point::Value::AsDouble(val)) => { + row_writer::write_f64(table, field, *val, row)?; + } + _ => {} + } + Ok(()) +} + +fn write_tags_and_timestamp( + table: &mut TableData, + row: &mut Vec, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, + data_point_attrs: Option<&Vec>, + timestamp_nanos: i64, +) -> Result<()> { + write_attributes(table, row, resource_attrs)?; + write_attributes(table, row, scope_attrs)?; + write_attributes(table, row, data_point_attrs)?; + + write_timestamp(table, row, timestamp_nanos)?; + + Ok(()) +} + +/// encode this gauge metric +/// +/// note that there can be multiple data points in the request, it's going to be +/// stored as multiple rows +fn encode_gauge( + table_writer: &mut MultiTableData, + name: &str, + gauge: &Gauge, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result<()> { + let table = table_writer.get_or_default_table_data( + &normalize_otlp_name(name), + APPROXIMATE_COLUMN_COUNT, + gauge.data_points.len(), + ); + + for data_point in &gauge.data_points { + let mut row = table.alloc_one_row(); + write_tags_and_timestamp( + table, + &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + + write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; + table.add_row(row); + } + + Ok(()) +} + +/// encode this sum metric +/// +/// `aggregation_temporality` and `monotonic` are ignored for now +fn encode_sum( + table_writer: &mut MultiTableData, + name: &str, + sum: &Sum, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result<()> { + let table = table_writer.get_or_default_table_data( + &normalize_otlp_name(name), + APPROXIMATE_COLUMN_COUNT, + sum.data_points.len(), + ); + + for data_point in &sum.data_points { + let mut row = table.alloc_one_row(); + write_tags_and_timestamp( + table, + &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; + table.add_row(row); + } + + Ok(()) +} + +const HISTOGRAM_LE_COLUMN: &str = "le"; + +/// Encode histogram data. This function returns 3 insert requests for 3 tables. +/// +/// The implementation has been following Prometheus histogram table format: +/// +/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper +/// limit, and `greptime_value` for bucket count +/// - A `%metric%_sum` table storing sum of samples +/// - A `%metric%_count` table storing count of samples. +/// +/// By its Prometheus compatibility, we hope to be able to use prometheus +/// quantile functions on this table. +fn encode_histogram( + table_writer: &mut MultiTableData, + name: &str, + hist: &Histogram, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result<()> { + let normalized_name = normalize_otlp_name(name); + + let bucket_table_name = format!("{}_bucket", normalized_name); + let sum_table_name = format!("{}_sum", normalized_name); + let count_table_name = format!("{}_count", normalized_name); + + let data_points_len = hist.data_points.len(); + // Note that the row and columns number here is approximate + let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3); + let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len); + let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len); + + for data_point in &hist.data_points { + let mut accumulated_count = 0; + for (idx, count) in data_point.bucket_counts.iter().enumerate() { + let mut bucket_row = bucket_table.alloc_one_row(); + write_tags_and_timestamp( + &mut bucket_table, + &mut bucket_row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + + if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) { + row_writer::write_tag( + &mut bucket_table, + HISTOGRAM_LE_COLUMN, + upper_bounds, + &mut bucket_row, + )?; + } else if idx == data_point.explicit_bounds.len() { + // The last bucket + row_writer::write_tag( + &mut bucket_table, + HISTOGRAM_LE_COLUMN, + f64::INFINITY, + &mut bucket_row, + )?; + } + + accumulated_count += count; + row_writer::write_f64( + &mut bucket_table, + GREPTIME_VALUE, + accumulated_count as f64, + &mut bucket_row, + )?; + + bucket_table.add_row(bucket_row); + } + + if let Some(sum) = data_point.sum { + let mut sum_row = sum_table.alloc_one_row(); + write_tags_and_timestamp( + &mut sum_table, + &mut sum_row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + + row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?; + sum_table.add_row(sum_row); + } + + let mut count_row = count_table.alloc_one_row(); + write_tags_and_timestamp( + &mut count_table, + &mut count_row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + + row_writer::write_f64( + &mut count_table, + GREPTIME_VALUE, + data_point.count as f64, + &mut count_row, + )?; + count_table.add_row(count_row); + } + + table_writer.add_table_data(bucket_table_name, bucket_table); + table_writer.add_table_data(sum_table_name, sum_table); + table_writer.add_table_data(count_table_name, count_table); + + Ok(()) +} + +#[allow(dead_code)] +fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> { + // TODO(sunng87): implement this using a prometheus compatible way + Ok(()) +} + +fn encode_summary( + table_writer: &mut MultiTableData, + name: &str, + summary: &Summary, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result<()> { + let table = table_writer.get_or_default_table_data( + &normalize_otlp_name(name), + APPROXIMATE_COLUMN_COUNT, + summary.data_points.len(), + ); + + for data_point in &summary.data_points { + let mut row = table.alloc_one_row(); + write_tags_and_timestamp( + table, + &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + + for quantile in &data_point.quantile_values { + row_writer::write_f64( + table, + &format!("greptime_p{:02}", quantile.quantile * 100f64), + quantile.value, + &mut row, + )?; + } + + row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?; + table.add_row(row); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use opentelemetry_proto::tonic::common::v1::any_value::Value as Val; + use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; + use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value; + use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile; + use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint}; + + use super::*; + + #[test] + fn test_normalize_otlp_name() { + assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free"); + assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free"); + assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free"); + assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free"); + assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free"); + } + + fn keyvalue(key: &str, value: &str) -> KeyValue { + KeyValue { + key: key.into(), + value: Some(AnyValue { + value: Some(Val::StringValue(value.into())), + }), + } + } + + #[test] + fn test_encode_gauge() { + let mut tables = MultiTableData::default(); + + let data_points = vec![ + NumberDataPoint { + attributes: vec![keyvalue("host", "testsevrer")], + time_unix_nano: 100, + value: Some(Value::AsInt(100)), + ..Default::default() + }, + NumberDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 105, + value: Some(Value::AsInt(105)), + ..Default::default() + }, + ]; + let gauge = Gauge { data_points }; + encode_gauge( + &mut tables, + "datamon", + &gauge, + Some(&vec![keyvalue("resource", "app")]), + Some(&vec![keyvalue("scope", "otel")]), + ) + .unwrap(); + + let table = tables.get_or_default_table_data("datamon", 0, 0); + assert_eq!(table.num_rows(), 2); + assert_eq!(table.num_columns(), 5); + assert_eq!( + table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_value" + ] + ); + } + + #[test] + fn test_encode_sum() { + let mut tables = MultiTableData::default(); + + let data_points = vec![ + NumberDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 100, + value: Some(Value::AsInt(100)), + ..Default::default() + }, + NumberDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 105, + value: Some(Value::AsInt(0)), + ..Default::default() + }, + ]; + let sum = Sum { + data_points, + ..Default::default() + }; + encode_sum( + &mut tables, + "datamon", + &sum, + Some(&vec![keyvalue("resource", "app")]), + Some(&vec![keyvalue("scope", "otel")]), + ) + .unwrap(); + + let table = tables.get_or_default_table_data("datamon", 0, 0); + assert_eq!(table.num_rows(), 2); + assert_eq!(table.num_columns(), 5); + assert_eq!( + table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_value" + ] + ); + } + + #[test] + fn test_encode_summary() { + let mut tables = MultiTableData::default(); + + let data_points = vec![SummaryDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 100, + count: 25, + sum: 5400.0, + quantile_values: vec![ + ValueAtQuantile { + quantile: 0.90, + value: 1000.0, + }, + ValueAtQuantile { + quantile: 0.95, + value: 3030.0, + }, + ], + ..Default::default() + }]; + let summary = Summary { data_points }; + encode_summary( + &mut tables, + "datamon", + &summary, + Some(&vec![keyvalue("resource", "app")]), + Some(&vec![keyvalue("scope", "otel")]), + ) + .unwrap(); + + let table = tables.get_or_default_table_data("datamon", 0, 0); + assert_eq!(table.num_rows(), 1); + assert_eq!(table.num_columns(), 7); + assert_eq!( + table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_p90", + "greptime_p95", + "greptime_count" + ] + ); + } + + #[test] + fn test_encode_histogram() { + let mut tables = MultiTableData::default(); + + let data_points = vec![HistogramDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 100, + start_time_unix_nano: 23, + count: 25, + sum: Some(100.), + max: Some(200.), + min: Some(0.03), + bucket_counts: vec![2, 4, 6, 9, 4], + explicit_bounds: vec![0.1, 1., 10., 100.], + ..Default::default() + }]; + + let histogram = Histogram { + data_points, + aggregation_temporality: AggregationTemporality::Delta.into(), + }; + encode_histogram( + &mut tables, + "histo", + &histogram, + Some(&vec![keyvalue("resource", "app")]), + Some(&vec![keyvalue("scope", "otel")]), + ) + .unwrap(); + + assert_eq!(3, tables.num_tables()); + + // bucket table + let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0); + assert_eq!(bucket_table.num_rows(), 5); + assert_eq!(bucket_table.num_columns(), 6); + assert_eq!( + bucket_table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "le", + "greptime_value", + ] + ); + + let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0); + assert_eq!(sum_table.num_rows(), 1); + assert_eq!(sum_table.num_columns(), 5); + assert_eq!( + sum_table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_value", + ] + ); + + let count_table = tables.get_or_default_table_data("histo_count", 0, 0); + assert_eq!(count_table.num_rows(), 1); + assert_eq!(count_table.num_columns(), 5); + assert_eq!( + count_table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_value", + ] + ); + } +} diff --git a/src/servers/src/otlp/plugin.rs b/src/servers/src/otlp/plugin.rs new file mode 100644 index 000000000000..ddcb4375e6d8 --- /dev/null +++ b/src/servers/src/otlp/plugin.rs @@ -0,0 +1,28 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; + +use super::trace::TraceSpans; + +/// Transformer helps to transform ExportTraceServiceRequest based on logic, like: +/// - uplift some fields from Attributes (Map type) to column +pub trait TraceParser: Send + Sync { + fn parse(&self, request: ExportTraceServiceRequest) -> TraceSpans; + fn table_name(&self) -> String; +} + +pub type TraceParserRef = Arc; diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs new file mode 100644 index 000000000000..20ba773f3db1 --- /dev/null +++ b/src/servers/src/otlp/trace.rs @@ -0,0 +1,411 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, RowInsertRequests}; +use common_grpc::writer::Precision; +use common_time::time::Time; +use itertools::Itertools; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue; +use opentelemetry_proto::tonic::common::v1::{ + AnyValue, ArrayValue, InstrumentationScope, KeyValue, KeyValueList, +}; +use opentelemetry_proto::tonic::trace::v1::span::{Event, Link}; +use opentelemetry_proto::tonic::trace::v1::{Span, Status}; +use serde_json::json; + +use super::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use crate::error::Result; +use crate::row_writer::{self, MultiTableData, TableData}; + +const APPROXIMATE_COLUMN_COUNT: usize = 24; +pub const TRACE_TABLE_NAME: &str = "traces_preview_v01"; + +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct TraceSpan { + // the following are tags + pub trace_id: String, + pub span_id: String, + pub parent_span_id: String, + + // the following are fields + pub resource_attributes: String, // TODO(yuanbohan): Map in the future + pub scope_name: String, + pub scope_version: String, + pub scope_attributes: String, // TODO(yuanbohan): Map in the future + pub trace_state: String, + pub span_name: String, + pub span_kind: String, + pub span_status_code: String, + pub span_status_message: String, + pub span_attributes: String, // TODO(yuanbohan): Map in the future + pub span_events: String, // TODO(yuanbohan): List in the future + pub span_links: String, // TODO(yuanbohan): List in the future + pub start_in_nanosecond: u64, // this is also the Timestamp Index + pub end_in_nanosecond: u64, + + pub uplifted_fields: Vec<(String, ColumnDataType, ValueData)>, +} + +pub type TraceSpans = Vec; + +/// Convert SpanTraces to GreptimeDB row insert requests. +/// Returns `InsertRequests` and total number of rows to ingest +pub fn to_grpc_insert_requests( + table_name: String, + spans: TraceSpans, +) -> Result<(RowInsertRequests, usize)> { + let mut multi_table_writer = MultiTableData::default(); + let one_table_writer = multi_table_writer.get_or_default_table_data( + table_name, + APPROXIMATE_COLUMN_COUNT, + spans.len(), + ); + + for span in spans { + write_span_to_row(one_table_writer, span)?; + } + + Ok(multi_table_writer.into_row_insert_requests()) +} + +pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> { + let mut row = writer.alloc_one_row(); + { + // tags + let iter = vec![ + ("trace_id", span.trace_id), + ("span_id", span.span_id), + ("parent_span_id", span.parent_span_id), + ] + .into_iter() + .map(|(col, val)| (col.to_string(), val)); + row_writer::write_tags(writer, iter, &mut row)?; + } + { + // fields + let str_fields_iter = vec![ + ("resource_attributes", span.resource_attributes), + ("scope_name", span.scope_name), + ("scope_version", span.scope_version), + ("scope_attributes", span.scope_attributes), + ("trace_state", span.trace_state), + ("span_name", span.span_name), + ("span_kind", span.span_kind), + ("span_status_code", span.span_status_code), + ("span_status_message", span.span_status_message), + ("span_attributes", span.span_attributes), + ("span_events", span.span_events), + ("span_links", span.span_links), + ] + .into_iter() + .map(|(col, val)| { + ( + col.into(), + ColumnDataType::String, + ValueData::StringValue(val), + ) + }); + + let time_fields_iter = vec![ + ("start", span.start_in_nanosecond), + ("end", span.end_in_nanosecond), + ] + .into_iter() + .map(|(col, val)| { + ( + col.into(), + ColumnDataType::TimestampNanosecond, + ValueData::TimestampNanosecondValue(val as i64), + ) + }); + + row_writer::write_fields(writer, str_fields_iter, &mut row)?; + row_writer::write_fields(writer, time_fields_iter, &mut row)?; + row_writer::write_fields(writer, span.uplifted_fields.into_iter(), &mut row)?; + } + + row_writer::write_f64( + writer, + GREPTIME_VALUE, + (span.end_in_nanosecond - span.start_in_nanosecond) as f64 / 1_000_000.0, // duration in millisecond + &mut row, + )?; + row_writer::write_ts_precision( + writer, + GREPTIME_TIMESTAMP, + Some(span.start_in_nanosecond as i64), + Precision::Nanosecond, + &mut row, + )?; + + writer.add_row(row); + + Ok(()) +} + +pub fn parse_span( + resource_attrs: &[KeyValue], + scope: &InstrumentationScope, + span: Span, +) -> TraceSpan { + let (span_status_code, span_status_message) = status_to_string(&span.status); + let span_kind = span.kind().as_str_name().into(); + TraceSpan { + trace_id: bytes_to_hex_string(&span.trace_id), + span_id: bytes_to_hex_string(&span.span_id), + parent_span_id: bytes_to_hex_string(&span.parent_span_id), + + resource_attributes: vec_kv_to_string(resource_attrs), + trace_state: span.trace_state, + + scope_name: scope.name.clone(), + scope_version: scope.version.clone(), + scope_attributes: vec_kv_to_string(&scope.attributes), + + span_name: span.name, + span_kind, + span_status_code, + span_status_message, + span_attributes: vec_kv_to_string(&span.attributes), + span_events: events_to_string(&span.events), + span_links: links_to_string(&span.links), + + start_in_nanosecond: span.start_time_unix_nano, + end_in_nanosecond: span.end_time_unix_nano, + + uplifted_fields: vec![], + } +} + +/// Convert OpenTelemetry traces to SpanTraces +/// +/// See +/// +/// for data structure of OTLP traces. +pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { + let mut spans = vec![]; + for resource_spans in request.resource_spans { + let resource_attrs = resource_spans + .resource + .map(|r| r.attributes) + .unwrap_or_default(); + for scope_spans in resource_spans.scope_spans { + let scope = scope_spans.scope.unwrap_or_default(); + for span in scope_spans.spans { + spans.push(parse_span(&resource_attrs, &scope, span)); + } + } + } + spans +} + +pub fn bytes_to_hex_string(bs: &[u8]) -> String { + bs.iter().map(|b| format!("{:02x}", b)).join("") +} + +pub fn arr_vals_to_string(arr: &ArrayValue) -> String { + let vs: Vec = arr + .values + .iter() + .filter_map(|val| any_value_to_string(val.clone())) + .collect(); + + serde_json::to_string(&vs).unwrap_or_else(|_| "[]".into()) +} + +pub fn vec_kv_to_string(vec: &[KeyValue]) -> String { + let vs: HashMap = vec + .iter() + .map(|kv| { + let val = kv + .value + .clone() + .and_then(any_value_to_string) + .unwrap_or_default(); + (kv.key.clone(), val) + }) + .collect(); + + serde_json::to_string(&vs).unwrap_or_else(|_| "{}".into()) +} + +pub fn kvlist_to_string(kvlist: &KeyValueList) -> String { + vec_kv_to_string(&kvlist.values) +} + +pub fn any_value_to_string(val: AnyValue) -> Option { + val.value.map(|value| match value { + OtlpValue::StringValue(s) => s, + OtlpValue::BoolValue(b) => b.to_string(), + OtlpValue::IntValue(i) => i.to_string(), + OtlpValue::DoubleValue(d) => d.to_string(), + OtlpValue::ArrayValue(arr) => arr_vals_to_string(&arr), + OtlpValue::KvlistValue(kv) => kvlist_to_string(&kv), + OtlpValue::BytesValue(bs) => bytes_to_hex_string(&bs), + }) +} + +pub fn event_to_string(event: &Event) -> String { + json!({ + "name": event.name, + "time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(), + "attrs": vec_kv_to_string(&event.attributes), + }) + .to_string() +} + +pub fn events_to_string(events: &[Event]) -> String { + let v: Vec = events.iter().map(event_to_string).collect(); + serde_json::to_string(&v).unwrap_or_else(|_| "[]".into()) +} + +pub fn link_to_string(link: &Link) -> String { + json!({ + "trace_id": link.trace_id, + "span_id": link.span_id, + "trace_state": link.trace_state, + "attributes": vec_kv_to_string(&link.attributes), + }) + .to_string() +} + +pub fn links_to_string(links: &[Link]) -> String { + let v: Vec = links.iter().map(link_to_string).collect(); + serde_json::to_string(&v).unwrap_or_else(|_| "[]".into()) +} + +pub fn status_to_string(status: &Option) -> (String, String) { + match status { + Some(status) => (status.code().as_str_name().into(), status.message.clone()), + None => ("".into(), "".into()), + } +} + +#[cfg(test)] +mod tests { + use common_time::time::Time; + use opentelemetry_proto::tonic::common::v1::{ + any_value, AnyValue, ArrayValue, KeyValue, KeyValueList, + }; + use opentelemetry_proto::tonic::trace::v1::span::Event; + use opentelemetry_proto::tonic::trace::v1::Status; + use serde_json::json; + + use crate::otlp::trace::{ + arr_vals_to_string, bytes_to_hex_string, event_to_string, kvlist_to_string, + status_to_string, vec_kv_to_string, + }; + + #[test] + fn test_bytes_to_hex_string() { + assert_eq!( + "24fe79948641b110a29bc27859307e8d", + bytes_to_hex_string(&[ + 36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141, + ]) + ); + + assert_eq!( + "baffeedd7b8debc0", + bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,]) + ); + } + + #[test] + fn test_arr_vals_to_string() { + assert_eq!("[]", arr_vals_to_string(&ArrayValue { values: vec![] })); + + let arr = ArrayValue { + values: vec![ + AnyValue { + value: Some(any_value::Value::StringValue("string_value".into())), + }, + AnyValue { + value: Some(any_value::Value::BoolValue(true)), + }, + AnyValue { + value: Some(any_value::Value::IntValue(1)), + }, + AnyValue { + value: Some(any_value::Value::DoubleValue(1.2)), + }, + ], + }; + let expect = json!(["string_value", "true", "1", "1.2"]).to_string(); + assert_eq!(expect, arr_vals_to_string(&arr)); + } + + #[test] + fn test_kv_list_to_string() { + let kvlist = KeyValueList { + values: vec![KeyValue { + key: "str_key".into(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue("val1".into())), + }), + }], + }; + let expect = json!({ + "str_key": "val1", + }) + .to_string(); + assert_eq!(expect, kvlist_to_string(&kvlist)) + } + + #[test] + fn test_event_to_string() { + let attributes = vec![KeyValue { + key: "str_key".into(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue("val1".into())), + }), + }]; + let event = Event { + time_unix_nano: 1697620662450128000_u64, + name: "event_name".into(), + attributes, + dropped_attributes_count: 0, + }; + let event_string = event_to_string(&event); + let expect = json!({ + "name": event.name, + "time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(), + "attrs": vec_kv_to_string(&event.attributes), + }); + + assert_eq!( + expect, + serde_json::from_str::(event_string.as_str()).unwrap() + ); + } + + #[test] + fn test_status_to_string() { + let message = String::from("status message"); + let status = Status { + code: 1, + message: message.clone(), + }; + + assert_eq!( + ("STATUS_CODE_OK".into(), message), + status_to_string(&Some(status)), + ); + } +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 4180c8469c8b..06ef6bbda5c0 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -34,6 +34,9 @@ use common_query::Output; use opentelemetry_proto::tonic::collector::metrics::v1::{ ExportMetricsServiceRequest, ExportMetricsServiceResponse, }; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, +}; use session::context::QueryContextRef; use crate::error::Result; @@ -101,4 +104,11 @@ pub trait OpenTelemetryProtocolHandler { request: ExportMetricsServiceRequest, ctx: QueryContextRef, ) -> Result; + + /// Handling opentelemetry traces request + async fn traces( + &self, + request: ExportTraceServiceRequest, + ctx: QueryContextRef, + ) -> Result; }