diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 9c9b8af48bc1..29e0cfc44657 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -42,7 +42,7 @@ impl OpenTelemetryProtocolHandler for Instance { .context(AuthSnafu)?; let (requests, rows) = otlp::to_grpc_insert_requests(request)?; let _ = self - .handle_inserts(requests, ctx) + .handle_row_inserts(requests, ctx) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu)?; diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 7db8a03b2d96..9ce78be2632c 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -53,7 +53,7 @@ impl TryFrom for RowInsertRequests { // tags if let Some(tags) = tags { - let kvs = tags.iter().map(|(k, v)| (k.as_str(), v.as_str())); + let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.as_str())); row_writer::write_tags(table_data, kvs, &mut one_row)?; } @@ -69,7 +69,7 @@ impl TryFrom for RowInsertRequests { ), FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)), }; - (k.as_str(), datatype, value) + (k.to_string(), datatype, value) }); row_writer::write_fields(table_data, fields, &mut one_row)?; diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index 8af21fee7b66..3acfaf0a881b 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -12,17 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::{InsertRequest, InsertRequests}; -use common_grpc::writer::{LinesWriter, Precision}; +use api::v1::{RowInsertRequests, Value}; +use common_grpc::writer::Precision; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue}; use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *}; -use snafu::ResultExt; -use crate::error::{self, Result}; +use crate::error::Result; +use crate::row_writer::{self, MultiTableData, TableData}; const GREPTIME_TIMESTAMP: &str = "greptime_timestamp"; const GREPTIME_VALUE: &str = "greptime_value"; +const GREPTIME_COUNT: &str = "greptime_count"; +/// the default column count for table writer +const APPROXIMATE_COLUMN_COUNT: usize = 8; /// Normalize otlp instrumentation, metric and attribute names /// @@ -43,313 +46,350 @@ fn normalize_otlp_name(name: &str) -> String { /// Returns `InsertRequests` and total number of rows to ingest pub fn to_grpc_insert_requests( request: ExportMetricsServiceRequest, -) -> Result<(InsertRequests, usize)> { - let mut insert_batch = Vec::new(); - let mut rows = 0; - - for resource in request.resource_metrics { - let resource_attrs = resource.resource.map(|r| r.attributes); - for scope in resource.scope_metrics { - let scope_attrs = scope.scope.map(|s| s.attributes); - for metric in scope.metrics { - if let Some(insert) = - encode_metrics(&metric, resource_attrs.as_ref(), scope_attrs.as_ref())? - { - rows += insert.row_count; - insert_batch.push(insert); - } +) -> Result<(RowInsertRequests, usize)> { + let mut table_writer = MultiTableData::default(); + + for resource in &request.resource_metrics { + let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes); + for scope in &resource.scope_metrics { + let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes); + for metric in &scope.metrics { + encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?; } } } - let inserts = InsertRequests { - inserts: insert_batch, - }; - - Ok((inserts, rows as usize)) + Ok(table_writer.into_row_insert_requests()) } fn encode_metrics( + table_writer: &mut MultiTableData, metric: &Metric, resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, -) -> Result> { +) -> Result<()> { let name = &metric.name; // note that we don't store description or unit, we might want to deal with // these fields in the future. if let Some(data) = &metric.data { match data { metric::Data::Gauge(gauge) => { - encode_gauge(name, gauge, resource_attrs, scope_attrs).map(Some) + encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?; + } + metric::Data::Sum(sum) => { + encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?; } - metric::Data::Sum(sum) => encode_sum(name, sum, resource_attrs, scope_attrs).map(Some), metric::Data::Summary(summary) => { - encode_summary(name, summary, resource_attrs, scope_attrs).map(Some) + encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?; + } + metric::Data::Histogram(hist) => { + encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?; } - // TODO(sunng87) leave histogram for next release - metric::Data::Histogram(_hist) => Ok(None), - metric::Data::ExponentialHistogram(_hist) => Ok(None), + // TODO(sunng87) leave ExponentialHistogram for next release + metric::Data::ExponentialHistogram(_hist) => {} } - } else { - Ok(None) } -} -fn write_attributes(lines: &mut LinesWriter, attrs: Option<&Vec>) -> Result<()> { - if let Some(attrs) = attrs { - for attr in attrs { - write_attribute(lines, attr)?; - } - } Ok(()) } -fn write_attribute(lines: &mut LinesWriter, attr: &KeyValue) -> Result<()> { - if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) { - match val { - any_value::Value::StringValue(s) => lines - .write_tag(&normalize_otlp_name(&attr.key), s) - .context(error::OtlpMetricsWriteSnafu)?, - - any_value::Value::IntValue(v) => lines - .write_tag(&normalize_otlp_name(&attr.key), &v.to_string()) - .context(error::OtlpMetricsWriteSnafu)?, - any_value::Value::DoubleValue(v) => lines - .write_tag(&normalize_otlp_name(&attr.key), &v.to_string()) - .context(error::OtlpMetricsWriteSnafu)?, - // TODO(sunng87): allow different type of values - _ => {} - } - } +fn write_attributes( + writer: &mut TableData, + row: &mut Vec, + attrs: Option<&Vec>, +) -> Result<()> { + if let Some(attrs) = attrs { + let table_tags = attrs.iter().filter_map(|attr| { + if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) { + let key = normalize_otlp_name(&attr.key); + match val { + any_value::Value::StringValue(s) => Some((key, s.to_string())), + any_value::Value::IntValue(v) => Some((key, v.to_string())), + any_value::Value::DoubleValue(v) => Some((key, v.to_string())), + _ => None, // TODO(sunng87): allow different type of values + } + } else { + None + } + }); + row_writer::write_tags(writer, table_tags, row)?; + } Ok(()) } -fn write_timestamp(lines: &mut LinesWriter, time_nano: i64) -> Result<()> { - lines - .write_ts(GREPTIME_TIMESTAMP, (time_nano, Precision::Nanosecond)) - .context(error::OtlpMetricsWriteSnafu)?; - Ok(()) +fn write_timestamp(table: &mut TableData, row: &mut Vec, time_nano: i64) -> Result<()> { + row_writer::write_ts_precision( + table, + GREPTIME_TIMESTAMP, + Some(time_nano), + Precision::Nanosecond, + row, + ) } fn write_data_point_value( - lines: &mut LinesWriter, + table: &mut TableData, + row: &mut Vec, field: &str, value: &Option, ) -> Result<()> { match value { Some(number_data_point::Value::AsInt(val)) => { // we coerce all values to f64 - lines - .write_f64(field, *val as f64) - .context(error::OtlpMetricsWriteSnafu)? + row_writer::write_f64(table, field, *val as f64, row)?; + } + Some(number_data_point::Value::AsDouble(val)) => { + row_writer::write_f64(table, field, *val, row)?; } - Some(number_data_point::Value::AsDouble(val)) => lines - .write_f64(field, *val) - .context(error::OtlpMetricsWriteSnafu)?, _ => {} } Ok(()) } +fn write_tags_and_timestamp( + table: &mut TableData, + row: &mut Vec, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, + data_point_attrs: Option<&Vec>, + timestamp_nanos: i64, +) -> Result<()> { + write_attributes(table, row, resource_attrs)?; + write_attributes(table, row, scope_attrs)?; + write_attributes(table, row, data_point_attrs)?; + + write_timestamp(table, row, timestamp_nanos)?; + + Ok(()) +} + /// encode this gauge metric /// /// note that there can be multiple data points in the request, it's going to be /// stored as multiple rows fn encode_gauge( + table_writer: &mut MultiTableData, name: &str, gauge: &Gauge, resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, -) -> Result { - let mut lines = LinesWriter::with_lines(gauge.data_points.len()); - for data_point in &gauge.data_points { - write_attributes(&mut lines, resource_attrs)?; - write_attributes(&mut lines, scope_attrs)?; - write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?; - write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; - write_data_point_value(&mut lines, GREPTIME_VALUE, &data_point.value)?; +) -> Result<()> { + let table = table_writer.get_or_default_table_data( + &normalize_otlp_name(name), + APPROXIMATE_COLUMN_COUNT, + gauge.data_points.len(), + ); - lines.commit(); + for data_point in &gauge.data_points { + let mut row = table.alloc_one_row(); + write_tags_and_timestamp( + table, + &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + + write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; + table.add_row(row); } - let (columns, row_count) = lines.finish(); - Ok(InsertRequest { - table_name: normalize_otlp_name(name), - columns, - row_count, - }) + Ok(()) } /// encode this sum metric /// /// `aggregation_temporality` and `monotonic` are ignored for now fn encode_sum( + table_writer: &mut MultiTableData, name: &str, sum: &Sum, resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, -) -> Result { - let mut lines = LinesWriter::with_lines(sum.data_points.len()); +) -> Result<()> { + let table = table_writer.get_or_default_table_data( + &normalize_otlp_name(name), + APPROXIMATE_COLUMN_COUNT, + sum.data_points.len(), + ); for data_point in &sum.data_points { - write_attributes(&mut lines, resource_attrs)?; - write_attributes(&mut lines, scope_attrs)?; - write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?; - - write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; - - write_data_point_value(&mut lines, GREPTIME_VALUE, &data_point.value)?; - - lines.commit(); + let mut row = table.alloc_one_row(); + write_tags_and_timestamp( + table, + &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; + table.add_row(row); } - let (columns, row_count) = lines.finish(); - Ok(InsertRequest { - table_name: normalize_otlp_name(name), - columns, - row_count, - }) + Ok(()) } -// TODO(sunng87): we may need better implementation for histogram -#[allow(dead_code)] -fn encode_histogram(name: &str, hist: &Histogram) -> Result { - let mut lines = LinesWriter::with_lines(hist.data_points.len()); - - for data_point in &hist.data_points { - for attr in &data_point.attributes { - write_attribute(&mut lines, attr)?; - } - - write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; - - for (idx, count) in data_point.bucket_counts.iter().enumerate() { - // here we don't store bucket boundary - lines - .write_u64(&format!("bucket_{}", idx), *count) - .context(error::OtlpMetricsWriteSnafu)?; - } - - if let Some(min) = data_point.min { - lines - .write_f64("min", min) - .context(error::OtlpMetricsWriteSnafu)?; - } +const HISTOGRAM_LE_COLUMN: &str = "le"; - if let Some(max) = data_point.max { - lines - .write_f64("max", max) - .context(error::OtlpMetricsWriteSnafu)?; - } - - lines.commit(); - } +/// Encode histogram data. This function returns 3 insert requests for 3 tables. +/// +/// The implementation has been following Prometheus histogram table format: +/// +/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper +/// limit, and `greptime_value` for bucket count +/// - A `%metric%_sum` table storing sum of samples +/// - A `%metric%_count` table storing count of samples. +/// +/// By its Prometheus compatibility, we hope to be able to use prometheus +/// quantile functions on this table. +fn encode_histogram( + table_writer: &mut MultiTableData, + name: &str, + hist: &Histogram, + resource_attrs: Option<&Vec>, + scope_attrs: Option<&Vec>, +) -> Result<()> { + let normalized_name = normalize_otlp_name(name); - let (columns, row_count) = lines.finish(); - Ok(InsertRequest { - table_name: normalize_otlp_name(name), - columns, - row_count, - }) -} + let bucket_table_name = format!("{}_bucket", normalized_name); + let sum_table_name = format!("{}_sum", normalized_name); + let count_table_name = format!("{}_count", normalized_name); -#[allow(dead_code)] -fn encode_exponential_histogram(name: &str, hist: &ExponentialHistogram) -> Result { - let mut lines = LinesWriter::with_lines(hist.data_points.len()); + let data_points_len = hist.data_points.len(); + // Note that the row and columns number here is approximate + let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3); + let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len); + let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len); for data_point in &hist.data_points { - for attr in &data_point.attributes { - write_attribute(&mut lines, attr)?; - } - - write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; - - // TODO(sunng87): confirm if this working - if let Some(positive_buckets) = &data_point.positive { - for (idx, count) in positive_buckets.bucket_counts.iter().enumerate() { - // here we don't store bucket boundary - lines - .write_u64( - &format!("bucket_{}", idx + positive_buckets.offset as usize), - *count, - ) - .context(error::OtlpMetricsWriteSnafu)?; + let mut accumulated_count = 0; + for (idx, count) in data_point.bucket_counts.iter().enumerate() { + let mut bucket_row = bucket_table.alloc_one_row(); + write_tags_and_timestamp( + &mut bucket_table, + &mut bucket_row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + + if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) { + row_writer::write_tag( + &mut bucket_table, + HISTOGRAM_LE_COLUMN, + upper_bounds, + &mut bucket_row, + )?; + } else if idx == data_point.explicit_bounds.len() { + // The last bucket + row_writer::write_tag( + &mut bucket_table, + HISTOGRAM_LE_COLUMN, + f64::INFINITY, + &mut bucket_row, + )?; } - } - if let Some(negative_buckets) = &data_point.negative { - for (idx, count) in negative_buckets.bucket_counts.iter().enumerate() { - lines - .write_u64( - &format!("bucket_{}", idx + negative_buckets.offset as usize), - *count, - ) - .context(error::OtlpMetricsWriteSnafu)?; - } - } + accumulated_count += count; + row_writer::write_f64( + &mut bucket_table, + GREPTIME_VALUE, + accumulated_count as f64, + &mut bucket_row, + )?; - if let Some(min) = data_point.min { - lines - .write_f64("min", min) - .context(error::OtlpMetricsWriteSnafu)?; + bucket_table.add_row(bucket_row); } - if let Some(max) = data_point.max { - lines - .write_f64("max", max) - .context(error::OtlpMetricsWriteSnafu)?; + if let Some(sum) = data_point.sum { + let mut sum_row = sum_table.alloc_one_row(); + write_tags_and_timestamp( + &mut sum_table, + &mut sum_row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + + row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?; + sum_table.add_row(sum_row); } - lines.commit(); + let mut count_row = count_table.alloc_one_row(); + write_tags_and_timestamp( + &mut count_table, + &mut count_row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; + + row_writer::write_f64( + &mut count_table, + GREPTIME_VALUE, + data_point.count as f64, + &mut count_row, + )?; + count_table.add_row(count_row); } - let (columns, row_count) = lines.finish(); - Ok(InsertRequest { - table_name: normalize_otlp_name(name), - columns, - row_count, - }) + table_writer.add_table_data(bucket_table_name, bucket_table); + table_writer.add_table_data(sum_table_name, sum_table); + table_writer.add_table_data(count_table_name, count_table); + + Ok(()) +} + +#[allow(dead_code)] +fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> { + // TODO(sunng87): implement this using a prometheus compatible way + Ok(()) } fn encode_summary( + table_writer: &mut MultiTableData, name: &str, summary: &Summary, resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, -) -> Result { - let mut lines = LinesWriter::with_lines(summary.data_points.len()); +) -> Result<()> { + let table = table_writer.get_or_default_table_data( + &normalize_otlp_name(name), + APPROXIMATE_COLUMN_COUNT, + summary.data_points.len(), + ); for data_point in &summary.data_points { - write_attributes(&mut lines, resource_attrs)?; - write_attributes(&mut lines, scope_attrs)?; - write_attributes(&mut lines, Some(data_point.attributes.as_ref()))?; - - write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; + let mut row = table.alloc_one_row(); + write_tags_and_timestamp( + table, + &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + )?; for quantile in &data_point.quantile_values { - // here we don't store bucket boundary - lines - .write_f64( - &format!("greptime_p{:02}", quantile.quantile * 100f64), - quantile.value, - ) - .context(error::OtlpMetricsWriteSnafu)?; + row_writer::write_f64( + table, + &format!("greptime_p{:02}", quantile.quantile * 100f64), + quantile.value, + &mut row, + )?; } - lines - .write_u64("greptime_count", data_point.count) - .context(error::OtlpMetricsWriteSnafu)?; - - lines.commit(); + row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?; + table.add_row(row); } - let (columns, row_count) = lines.finish(); - Ok(InsertRequest { - table_name: normalize_otlp_name(name), - columns, - row_count, - }) + Ok(()) } #[cfg(test)] @@ -358,7 +398,7 @@ mod tests { use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value; use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile; - use opentelemetry_proto::tonic::metrics::v1::NumberDataPoint; + use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint}; use super::*; @@ -382,6 +422,8 @@ mod tests { #[test] fn test_encode_gauge() { + let mut tables = MultiTableData::default(); + let data_points = vec![ NumberDataPoint { attributes: vec![keyvalue("host", "testsevrer")], @@ -397,7 +439,8 @@ mod tests { }, ]; let gauge = Gauge { data_points }; - let inserts = encode_gauge( + encode_gauge( + &mut tables, "datamon", &gauge, Some(&vec![keyvalue("resource", "app")]), @@ -405,12 +448,12 @@ mod tests { ) .unwrap(); - assert_eq!(inserts.table_name, "datamon"); - assert_eq!(inserts.row_count, 2); - assert_eq!(inserts.columns.len(), 5); + let table = tables.get_or_default_table_data("datamon", 0, 0); + assert_eq!(table.num_rows(), 2); + assert_eq!(table.num_columns(), 5); assert_eq!( - inserts - .columns + table + .columns() .iter() .map(|c| &c.column_name) .collect::>(), @@ -426,6 +469,8 @@ mod tests { #[test] fn test_encode_sum() { + let mut tables = MultiTableData::default(); + let data_points = vec![ NumberDataPoint { attributes: vec![keyvalue("host", "testserver")], @@ -444,7 +489,8 @@ mod tests { data_points, ..Default::default() }; - let inserts = encode_sum( + encode_sum( + &mut tables, "datamon", &sum, Some(&vec![keyvalue("resource", "app")]), @@ -452,12 +498,12 @@ mod tests { ) .unwrap(); - assert_eq!(inserts.table_name, "datamon"); - assert_eq!(inserts.row_count, 2); - assert_eq!(inserts.columns.len(), 5); + let table = tables.get_or_default_table_data("datamon", 0, 0); + assert_eq!(table.num_rows(), 2); + assert_eq!(table.num_columns(), 5); assert_eq!( - inserts - .columns + table + .columns() .iter() .map(|c| &c.column_name) .collect::>(), @@ -473,6 +519,8 @@ mod tests { #[test] fn test_encode_summary() { + let mut tables = MultiTableData::default(); + let data_points = vec![SummaryDataPoint { attributes: vec![keyvalue("host", "testserver")], time_unix_nano: 100, @@ -491,7 +539,8 @@ mod tests { ..Default::default() }]; let summary = Summary { data_points }; - let inserts = encode_summary( + encode_summary( + &mut tables, "datamon", &summary, Some(&vec![keyvalue("resource", "app")]), @@ -499,12 +548,12 @@ mod tests { ) .unwrap(); - assert_eq!(inserts.table_name, "datamon"); - assert_eq!(inserts.row_count, 1); - assert_eq!(inserts.columns.len(), 7); + let table = tables.get_or_default_table_data("datamon", 0, 0); + assert_eq!(table.num_rows(), 1); + assert_eq!(table.num_columns(), 7); assert_eq!( - inserts - .columns + table + .columns() .iter() .map(|c| &c.column_name) .collect::>(), @@ -519,4 +568,93 @@ mod tests { ] ); } + + #[test] + fn test_encode_histogram() { + let mut tables = MultiTableData::default(); + + let data_points = vec![HistogramDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 100, + start_time_unix_nano: 23, + count: 25, + sum: Some(100.), + max: Some(200.), + min: Some(0.03), + bucket_counts: vec![2, 4, 6, 9, 4], + explicit_bounds: vec![0.1, 1., 10., 100.], + ..Default::default() + }]; + + let histogram = Histogram { + data_points, + aggregation_temporality: AggregationTemporality::Delta.into(), + }; + encode_histogram( + &mut tables, + "histo", + &histogram, + Some(&vec![keyvalue("resource", "app")]), + Some(&vec![keyvalue("scope", "otel")]), + ) + .unwrap(); + + assert_eq!(3, tables.num_tables()); + + // bucket table + let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0); + assert_eq!(bucket_table.num_rows(), 5); + assert_eq!(bucket_table.num_columns(), 6); + assert_eq!( + bucket_table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "le", + "greptime_value", + ] + ); + + let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0); + assert_eq!(sum_table.num_rows(), 1); + assert_eq!(sum_table.num_columns(), 5); + assert_eq!( + sum_table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_value", + ] + ); + + let count_table = tables.get_or_default_table_data("histo_count", 0, 0); + assert_eq!(count_table.num_rows(), 1); + assert_eq!(count_table.num_columns(), 5); + assert_eq!( + count_table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "resource", + "scope", + "host", + "greptime_timestamp", + "greptime_value", + ] + ); + } } diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 2a745973ee4e..63a13842b87e 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -334,7 +334,7 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe if label.name == METRIC_NAME_LABEL { None } else { - Some((label.name.as_str(), label.value.as_str())) + Some((label.name.to_string(), label.value.as_str())) } }); row_writer::write_tags(table_data, kvs, &mut one_row)?; diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index 5504b34aa1f8..9dfe92738706 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -27,13 +27,13 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{IncompatibleSchemaSnafu, InfluxdbLinesWriteSnafu, Result, TimePrecisionSnafu}; -pub struct TableData<'a> { +pub struct TableData { schema: Vec, rows: Vec, - column_indexes: HashMap<&'a str, usize>, + column_indexes: HashMap, } -impl TableData<'_> { +impl TableData { pub fn new(num_columns: usize, num_rows: usize) -> Self { Self { schema: Vec::with_capacity(num_columns), @@ -62,16 +62,27 @@ impl TableData<'_> { self.rows.push(Row { values }) } + #[allow(dead_code)] + pub fn columns(&self) -> &Vec { + &self.schema + } + pub fn into_schema_and_rows(self) -> (Vec, Vec) { (self.schema, self.rows) } } -pub struct MultiTableData<'a> { - table_data_map: HashMap<&'a str, TableData<'a>>, +pub struct MultiTableData { + table_data_map: HashMap, +} + +impl Default for MultiTableData { + fn default() -> Self { + Self::new() + } } -impl<'a> MultiTableData<'a> { +impl MultiTableData { pub fn new() -> Self { Self { table_data_map: HashMap::new(), @@ -80,15 +91,25 @@ impl<'a> MultiTableData<'a> { pub fn get_or_default_table_data( &mut self, - table_name: &'a str, + table_name: impl ToString, num_columns: usize, num_rows: usize, - ) -> &mut TableData<'a> { + ) -> &mut TableData { self.table_data_map - .entry(table_name) + .entry(table_name.to_string()) .or_insert_with(|| TableData::new(num_columns, num_rows)) } + pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) { + self.table_data_map + .insert(table_name.to_string(), table_data); + } + + #[allow(dead_code)] + pub fn num_tables(&self) -> usize { + self.table_data_map.len() + } + /// Returns the request and number of rows in it. pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) { let mut total_rows = 0; @@ -106,7 +127,7 @@ impl<'a> MultiTableData<'a> { } RowInsertRequest { - table_name: table_name.to_string(), + table_name, rows: Some(Rows { schema, rows }), } }) @@ -117,9 +138,9 @@ impl<'a> MultiTableData<'a> { } } -pub fn write_tags<'a>( - table_data: &mut TableData<'a>, - kvs: impl Iterator, +pub fn write_tags( + table_data: &mut TableData, + kvs: impl Iterator, one_row: &mut Vec, ) -> Result<()> { let ktv_iter = kvs.map(|(k, v)| { @@ -132,31 +153,53 @@ pub fn write_tags<'a>( write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row) } -pub fn write_fields<'a>( - table_data: &mut TableData<'a>, - fields: impl Iterator, +pub fn write_fields( + table_data: &mut TableData, + fields: impl Iterator, one_row: &mut Vec, ) -> Result<()> { write_by_semantic_type(table_data, SemanticType::Field, fields, one_row) } -pub fn write_f64<'a>( - table_data: &mut TableData<'a>, - name: &'a str, +pub fn write_tag( + table_data: &mut TableData, + name: impl ToString, + value: impl ToString, + one_row: &mut Vec, +) -> Result<()> { + write_by_semantic_type( + table_data, + SemanticType::Tag, + std::iter::once(( + name.to_string(), + ColumnDataType::String, + ValueData::StringValue(value.to_string()), + )), + one_row, + ) +} + +pub fn write_f64( + table_data: &mut TableData, + name: impl ToString, value: f64, one_row: &mut Vec, ) -> Result<()> { write_fields( table_data, - std::iter::once((name, ColumnDataType::Float64, ValueData::F64Value(value))), + std::iter::once(( + name.to_string(), + ColumnDataType::Float64, + ValueData::F64Value(value), + )), one_row, ) } -fn write_by_semantic_type<'a>( - table_data: &mut TableData<'a>, +fn write_by_semantic_type( + table_data: &mut TableData, semantic_type: SemanticType, - ktv_iter: impl Iterator, + ktv_iter: impl Iterator, one_row: &mut Vec, ) -> Result<()> { let TableData { @@ -166,7 +209,7 @@ fn write_by_semantic_type<'a>( } = table_data; for (name, datatype, value) in ktv_iter { - let index = column_indexes.entry(name).or_insert(schema.len()); + let index = column_indexes.entry(name.clone()).or_insert(schema.len()); if *index == schema.len() { schema.push(ColumnSchema { column_name: name.to_string(), @@ -183,18 +226,18 @@ fn write_by_semantic_type<'a>( Ok(()) } -pub fn write_ts_millis<'a>( - table_data: &mut TableData<'a>, - name: &'a str, +pub fn write_ts_millis( + table_data: &mut TableData, + name: impl ToString, ts: Option, one_row: &mut Vec, ) -> Result<()> { write_ts_precision(table_data, name, ts, Precision::Millisecond, one_row) } -pub fn write_ts_precision<'a>( - table_data: &mut TableData<'a>, - name: &'a str, +pub fn write_ts_precision( + table_data: &mut TableData, + name: impl ToString, ts: Option, precision: Precision, one_row: &mut Vec, @@ -204,6 +247,7 @@ pub fn write_ts_precision<'a>( column_indexes, .. } = table_data; + let name = name.to_string(); let ts = match ts { Some(timestamp) => writer::to_ms_ts(precision, timestamp), @@ -219,10 +263,10 @@ pub fn write_ts_precision<'a>( } }; - let index = column_indexes.entry(name).or_insert(schema.len()); + let index = column_indexes.entry(name.clone()).or_insert(schema.len()); if *index == schema.len() { schema.push(ColumnSchema { - column_name: name.to_string(), + column_name: name, datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as i32, }); diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs index 437d8256b49f..942fad14a551 100644 --- a/tests-integration/src/otlp.rs +++ b/tests-integration/src/otlp.rs @@ -87,6 +87,65 @@ mod test { +------------+-------+--------------------+------------+---------------------+----------------+ | greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 105.0 | | greptimedb | otel | java | testsevrer | 1970-01-01T00:00:00 | 100.0 | ++------------+-------+--------------------+------------+---------------------+----------------+", + ); + + let mut output = instance + .do_query( + "SELECT le, greptime_value FROM my_test_histo_bucket order by le", + ctx.clone(), + ) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { + unreachable!() + }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++-----+----------------+ +| le | greptime_value | ++-----+----------------+ +| 1 | 1.0 | +| 5 | 3.0 | +| inf | 4.0 | ++-----+----------------+", + ); + + let mut output = instance + .do_query("SELECT * FROM my_test_histo_sum", ctx.clone()) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { + unreachable!() + }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++------------+-------+--------------------+------------+---------------------+----------------+ +| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | ++------------+-------+--------------------+------------+---------------------+----------------+ +| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 51.0 | ++------------+-------+--------------------+------------+---------------------+----------------+", + ); + + let mut output = instance + .do_query("SELECT * FROM my_test_histo_count", ctx.clone()) + .await; + let output = output.remove(0).unwrap(); + let Output::Stream(stream) = output else { + unreachable!() + }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++------------+-------+--------------------+------------+---------------------+----------------+ +| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | ++------------+-------+--------------------+------------+---------------------+----------------+ +| greptimedb | otel | java | testserver | 1970-01-01T00:00:00 | 4.0 | +------------+-------+--------------------+------------+---------------------+----------------+", ); } @@ -108,15 +167,38 @@ mod test { ]; let gauge = Gauge { data_points }; + let histo_data_points = vec![HistogramDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 100, + count: 4, + bucket_counts: vec![1, 2, 1], + explicit_bounds: vec![1.0f64, 5.0f64], + sum: Some(51f64), + ..Default::default() + }]; + + let histo = Histogram { + data_points: histo_data_points, + aggregation_temporality: 0, + }; + ExportMetricsServiceRequest { resource_metrics: vec![ResourceMetrics { scope_metrics: vec![ScopeMetrics { - metrics: vec![Metric { - name: "my.test.metric".into(), - description: "my ignored desc".into(), - unit: "my ignored unit".into(), - data: Some(metric::Data::Gauge(gauge)), - }], + metrics: vec![ + Metric { + name: "my.test.metric".into(), + description: "my ignored desc".into(), + unit: "my ignored unit".into(), + data: Some(metric::Data::Gauge(gauge)), + }, + Metric { + name: "my.test.histo".into(), + description: "my ignored desc".into(), + unit: "my ignored unit".into(), + data: Some(metric::Data::Histogram(histo)), + }, + ], scope: Some(InstrumentationScope { attributes: vec![ keyvalue("scope", "otel"),