diff --git a/Cargo.lock b/Cargo.lock index 644d1ccdd996..89a4729c58a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1774,6 +1774,7 @@ dependencies = [ "common-recordbatch", "common-runtime", "common-telemetry", + "common-time", "criterion 0.4.0", "dashmap", "datafusion", diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index cae229548713..416566506921 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -14,6 +14,7 @@ common-error = { workspace = true } common-recordbatch = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } +common-time = { workspace = true } dashmap = "5.4" datafusion.workspace = true datatypes = { workspace = true } diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index 5b9f46c86d68..09e6082b7908 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -75,6 +75,9 @@ pub enum Error { location: Location, source: datatypes::error::Error, }, + + #[snafu(display("Not supported: {}", feat))] + NotSupported { feat: String }, } impl ErrorExt for Error { @@ -83,7 +86,8 @@ impl ErrorExt for Error { Error::InvalidTlsConfig { .. } | Error::InvalidConfigFilePath { .. } | Error::TypeMismatch { .. } - | Error::InvalidFlightData { .. } => StatusCode::InvalidArguments, + | Error::InvalidFlightData { .. } + | Error::NotSupported { .. } => StatusCode::InvalidArguments, Error::CreateChannel { .. } | Error::Conversion { .. } diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index d601ef2a0b6b..0ea270d388bc 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -18,9 +18,11 @@ use std::fmt::Display; use api::helper::values_with_capacity; use api::v1::{Column, ColumnDataType, SemanticType}; use common_base::BitVec; +use common_time::timestamp::TimeUnit; use snafu::ensure; use crate::error::{Result, TypeMismatchSnafu}; +use crate::Error; type ColumnName = String; @@ -259,6 +261,24 @@ impl Display for Precision { } } +impl TryFrom for TimeUnit { + type Error = Error; + + fn try_from(precision: Precision) -> std::result::Result { + Ok(match precision { + Precision::Second => TimeUnit::Second, + Precision::Millisecond => TimeUnit::Millisecond, + Precision::Microsecond => TimeUnit::Microsecond, + Precision::Nanosecond => TimeUnit::Nanosecond, + _ => { + return Err(Error::NotSupported { + feat: format!("convert {precision} into TimeUnit"), + }) + } + }) + } +} + #[cfg(test)] mod tests { use api::v1::{ColumnDataType, SemanticType}; diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 2b25b934974b..0e718250764e 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -154,9 +154,9 @@ impl PromStoreProtocolHandler for Instance { .as_ref() .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite) .context(AuthSnafu)?; - let (requests, samples) = prom_store::to_grpc_insert_requests(request)?; + let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?; let _ = self - .handle_inserts(requests, ctx) + .handle_row_inserts(requests, ctx) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu)?; diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 1ad648d83f70..617298c7d066 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -16,20 +16,16 @@ use std::collections::HashMap; use api::v1::value::ValueData; use api::v1::{ - ColumnDataType, ColumnSchema, InsertRequest as GrpcInsertRequest, InsertRequests, Row, - RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value, + ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests, RowInsertRequests, }; -use common_grpc::writer; use common_grpc::writer::{LinesWriter, Precision}; use common_time::timestamp::TimeUnit; use common_time::Timestamp; -use influxdb_line_protocol::{parse_lines, FieldSet, FieldValue, TagSet}; -use snafu::{ensure, OptionExt, ResultExt}; +use influxdb_line_protocol::{parse_lines, FieldValue}; +use snafu::{OptionExt, ResultExt}; -use crate::error::{ - Error, IncompatibleSchemaSnafu, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, - TimePrecisionSnafu, -}; +use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, TimePrecisionSnafu}; +use crate::row_writer::{self, MultiTableData}; pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts"; pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond; @@ -107,7 +103,7 @@ impl TryFrom for InsertRequests { } else { let precision = unwrap_or_default_precision(value.precision); let timestamp = Timestamp::current_millis(); - let unit = get_time_unit(precision)?; + let unit: TimeUnit = precision.try_into().context(InfluxdbLinesWriteSnafu)?; let timestamp = timestamp .convert_to(unit) .with_context(|| TimePrecisionSnafu { @@ -147,13 +143,7 @@ impl TryFrom for RowInsertRequests { .collect::>>() .context(InfluxdbLineProtocolSnafu)?; - struct TableData<'a> { - schema: Vec, - rows: Vec, - column_indexes: HashMap<&'a str, usize>, - } - - let mut table_data_map = HashMap::new(); + let mut multi_table_data = MultiTableData::new(); for line in &lines { let table_name = line.series.measurement.as_str(); @@ -163,192 +153,46 @@ impl TryFrom for RowInsertRequests { // tags.len + fields.len + timestamp(+1) let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1; - let TableData { - schema, - rows, - column_indexes, - } = table_data_map - .entry(table_name) - .or_insert_with(|| TableData { - schema: Vec::with_capacity(num_columns), - rows: Vec::new(), - column_indexes: HashMap::with_capacity(num_columns), - }); - - let mut one_row = vec![Value { value_data: None }; schema.len()]; + let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0); + let mut one_row = table_data.alloc_one_row(); // tags - parse_tags(tags, column_indexes, schema, &mut one_row)?; - // fields - parse_fields(fields, column_indexes, schema, &mut one_row)?; - // timestamp - parse_ts(ts, value.precision, column_indexes, schema, &mut one_row)?; - - rows.push(Row { values: one_row }); - } - - let inserts = table_data_map - .into_iter() - .map( - |( - table_name, - TableData { - schema, mut rows, .. - }, - )| { - let num_columns = schema.len(); - for row in rows.iter_mut() { - if num_columns > row.values.len() { - row.values.resize(num_columns, Value { value_data: None }); - } - } - - RowInsertRequest { - table_name: table_name.to_string(), - rows: Some(Rows { schema, rows }), - ..Default::default() - } - }, - ) - .collect::>(); - - Ok(RowInsertRequests { inserts }) - } -} + if let Some(tags) = tags { + let kvs = tags.iter().map(|(k, v)| (k.as_str(), v.as_str())); + row_writer::write_tags(table_data, kvs, &mut one_row)?; + } -fn parse_tags<'a>( - tags: &'a Option, - column_indexes: &mut HashMap<&'a str, usize>, - schema: &mut Vec, - one_row: &mut Vec, -) -> Result<(), Error> { - let Some(tags) = tags else { - return Ok(()); - }; - - for (k, v) in tags { - let index = column_indexes.entry(k.as_str()).or_insert(schema.len()); - if *index == schema.len() { - schema.push(ColumnSchema { - column_name: k.to_string(), - datatype: ColumnDataType::String as i32, - semantic_type: SemanticType::Tag as i32, + // fields + let fields = fields.iter().map(|(k, v)| { + let (datatype, value) = match v { + FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)), + FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)), + FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)), + FieldValue::String(v) => ( + ColumnDataType::String, + ValueData::StringValue(v.to_string()), + ), + FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)), + }; + (k.as_str(), datatype, value) }); - one_row.push(ValueData::StringValue(v.to_string()).into()); - } else { - check_schema(ColumnDataType::String, SemanticType::Tag, &schema[*index])?; - one_row[*index].value_data = Some(ValueData::StringValue(v.to_string())); - } - } - - Ok(()) -} - -fn parse_fields<'a>( - fields: &'a FieldSet, - column_indexes: &mut HashMap<&'a str, usize>, - schema: &mut Vec, - one_row: &mut Vec, -) -> Result<(), Error> { - for (k, v) in fields { - let index = column_indexes.entry(k.as_str()).or_insert(schema.len()); - let (datatype, value) = match v { - FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)), - FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)), - FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)), - FieldValue::String(v) => ( - ColumnDataType::String, - ValueData::StringValue(v.to_string()), - ), - FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)), - }; + row_writer::write_fields(table_data, fields, &mut one_row)?; - if *index == schema.len() { - schema.push(ColumnSchema { - column_name: k.to_string(), - datatype: datatype as i32, - semantic_type: SemanticType::Field as i32, - }); - one_row.push(value.into()); - } else { - check_schema(datatype, SemanticType::Field, &schema[*index])?; - one_row[*index].value_data = Some(value); + // timestamp + let precision = unwrap_or_default_precision(value.precision); + row_writer::write_ts_precision( + table_data, + INFLUXDB_TIMESTAMP_COLUMN_NAME, + ts, + precision, + &mut one_row, + )?; + + table_data.add_row(one_row); } - } - Ok(()) -} - -fn parse_ts( - ts: Option, - precision: Option, - column_indexes: &mut HashMap<&str, usize>, - schema: &mut Vec, - one_row: &mut Vec, -) -> Result<(), Error> { - let precision = unwrap_or_default_precision(precision); - let ts = match ts { - Some(timestamp) => writer::to_ms_ts(precision, timestamp), - None => { - let timestamp = Timestamp::current_millis(); - let unit = get_time_unit(precision)?; - let timestamp = timestamp - .convert_to(unit) - .with_context(|| TimePrecisionSnafu { - name: precision.to_string(), - })?; - writer::to_ms_ts(precision, timestamp.into()) - } - }; - - let column_name = INFLUXDB_TIMESTAMP_COLUMN_NAME; - let index = column_indexes.entry(column_name).or_insert(schema.len()); - if *index == schema.len() { - schema.push(ColumnSchema { - column_name: column_name.to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, - semantic_type: SemanticType::Timestamp as i32, - }); - one_row.push(ValueData::TsMillisecondValue(ts).into()) - } else { - check_schema( - ColumnDataType::TimestampMillisecond, - SemanticType::Timestamp, - &schema[*index], - )?; - one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts)); + Ok(multi_table_data.into_row_insert_requests().0) } - - Ok(()) -} - -#[inline] -fn check_schema( - datatype: ColumnDataType, - semantic_type: SemanticType, - schema: &ColumnSchema, -) -> Result<(), Error> { - ensure!( - schema.datatype == datatype as i32, - IncompatibleSchemaSnafu { - column_name: &schema.column_name, - datatype: "datatype", - expected: schema.datatype, - actual: datatype as i32, - } - ); - - ensure!( - schema.semantic_type == semantic_type as i32, - IncompatibleSchemaSnafu { - column_name: &schema.column_name, - datatype: "semantic_type", - expected: schema.semantic_type, - actual: semantic_type as i32, - } - ); - - Ok(()) } #[inline] @@ -360,25 +204,11 @@ fn unwrap_or_default_precision(precision: Option) -> Precision { } } -#[inline] -fn get_time_unit(precision: Precision) -> Result { - Ok(match precision { - Precision::Second => TimeUnit::Second, - Precision::Millisecond => TimeUnit::Millisecond, - Precision::Microsecond => TimeUnit::Microsecond, - Precision::Nanosecond => TimeUnit::Nanosecond, - _ => { - return Err(Error::NotSupported { - feat: format!("convert {precision} into TimeUnit"), - }) - } - }) -} - #[cfg(test)] mod tests { use api::v1::column::Values; - use api::v1::{Column, ColumnDataType, SemanticType}; + use api::v1::value::ValueData; + use api::v1::{Column, ColumnDataType, Rows, SemanticType}; use common_base::BitVec; use super::*; diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index acfd12c6ccd4..2bb142977d4f 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -37,6 +37,7 @@ pub mod postgres; pub mod prom_store; pub mod prometheus_handler; pub mod query_handler; +mod row_writer; pub mod server; mod shutdown; pub mod tls; diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index f87ce71741ec..77cd6c8af49e 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -20,7 +20,7 @@ use std::hash::{Hash, Hasher}; use api::prom_store::remote::label_matcher::Type as MatcherType; use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; -use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests}; +use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests, RowInsertRequests}; use common_grpc::writer::{LinesWriter, Precision}; use common_recordbatch::{RecordBatch, RecordBatches}; use common_time::timestamp::TimeUnit; @@ -34,6 +34,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use snap::raw::{Decoder, Encoder}; use crate::error::{self, Result}; +use crate::row_writer::{self, MultiTableData}; pub const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; pub const FIELD_COLUMN_NAME: &str = "greptime_value"; @@ -300,6 +301,61 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result Result<(RowInsertRequests, usize)> { + let mut multi_table_data = MultiTableData::new(); + + for series in &request.timeseries { + let table_name = &series + .labels + .iter() + .find(|label| { + // The metric name is a special label + label.name == METRIC_NAME_LABEL + }) + .context(error::InvalidPromRemoteRequestSnafu { + msg: "missing '__name__' label in time-series", + })? + .value; + + // The metric name is a special label, + // num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp) + let num_columns = series.labels.len() + 1; + + let table_data = multi_table_data.get_or_default_table_data( + table_name, + num_columns, + series.samples.len(), + ); + + for Sample { value, timestamp } in &series.samples { + let mut one_row = table_data.alloc_one_row(); + + // labels + let kvs = series.labels.iter().filter_map(|label| { + if label.name == METRIC_NAME_LABEL { + None + } else { + Some((label.name.as_str(), label.value.as_str())) + } + }); + row_writer::write_tags(table_data, kvs, &mut one_row)?; + // value + row_writer::write_f64(table_data, FIELD_COLUMN_NAME, *value, &mut one_row)?; + // timestamp + row_writer::write_ts_millis( + table_data, + TIMESTAMP_COLUMN_NAME, + Some(*timestamp), + &mut one_row, + )?; + + table_data.add_row(one_row); + } + } + + Ok(multi_table_data.into_row_insert_requests()) +} + pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> { let mut writers: HashMap = HashMap::new(); for timeseries in &request.timeseries { @@ -450,6 +506,7 @@ mod tests { use std::sync::Arc; use api::prom_store::remote::LabelMatcher; + use api::v1::{ColumnDataType, Row, SemanticType}; use datafusion::prelude::SessionContext; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; @@ -564,6 +621,141 @@ mod tests { assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string); } + fn column_schemas_with( + mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>, + ) -> Vec { + kts_iter.push(( + "greptime_value", + ColumnDataType::Float64, + SemanticType::Field, + )); + kts_iter.push(( + "greptime_timestamp", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + )); + + kts_iter + .into_iter() + .map(|(k, t, s)| api::v1::ColumnSchema { + column_name: k.to_string(), + datatype: t as i32, + semantic_type: s as i32, + }) + .collect() + } + + fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row { + Row { + values: vec![ + api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::F64Value(value)), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)), + }, + ], + } + } + + fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row { + Row { + values: vec![ + api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::F64Value(value)), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)), + }, + ], + } + } + + #[test] + fn test_write_request_to_row_insert_exprs() { + let write_request = WriteRequest { + timeseries: mock_timeseries(), + ..Default::default() + }; + + let mut exprs = to_grpc_row_insert_requests(write_request) + .unwrap() + .0 + .inserts; + exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name)); + assert_eq!(3, exprs.len()); + assert_eq!("metric1", exprs[0].table_name); + assert_eq!("metric2", exprs[1].table_name); + assert_eq!("metric3", exprs[2].table_name); + + let rows = exprs[0].rows.as_ref().unwrap(); + let schema = &rows.schema; + let rows = &rows.rows; + assert_eq!(2, rows.len()); + assert_eq!(3, schema.len()); + assert_eq!( + column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]), + *schema + ); + assert_eq!( + &vec![ + make_row_with_label("spark", 1.0, 1000), + make_row_with_label("spark", 2.0, 2000), + ], + rows + ); + + let rows = exprs[1].rows.as_ref().unwrap(); + let schema = &rows.schema; + let rows = &rows.rows; + assert_eq!(2, rows.len()); + assert_eq!(4, schema.len()); + assert_eq!( + column_schemas_with(vec![ + ("instance", ColumnDataType::String, SemanticType::Tag), + ("idc", ColumnDataType::String, SemanticType::Tag) + ]), + *schema + ); + assert_eq!( + &vec![ + make_row_with_2_labels("test_host1", "z001", 3.0, 1000), + make_row_with_2_labels("test_host1", "z001", 4.0, 2000), + ], + rows + ); + + let rows = exprs[2].rows.as_ref().unwrap(); + let schema = &rows.schema; + let rows = &rows.rows; + assert_eq!(3, rows.len()); + assert_eq!(4, schema.len()); + assert_eq!( + column_schemas_with(vec![ + ("idc", ColumnDataType::String, SemanticType::Tag), + ("app", ColumnDataType::String, SemanticType::Tag) + ]), + *schema + ); + assert_eq!( + &vec![ + make_row_with_2_labels("z002", "biz", 5.0, 1000), + make_row_with_2_labels("z002", "biz", 6.0, 2000), + make_row_with_2_labels("z002", "biz", 7.0, 3000), + ], + rows + ); + } + #[test] fn test_write_request_to_insert_exprs() { let write_request = WriteRequest { diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs new file mode 100644 index 000000000000..ab6b80d23355 --- /dev/null +++ b/src/servers/src/row_writer.rs @@ -0,0 +1,270 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, + Value, +}; +use common_grpc::writer; +use common_grpc::writer::Precision; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error::{IncompatibleSchemaSnafu, InfluxdbLinesWriteSnafu, Result, TimePrecisionSnafu}; + +pub struct TableData<'a> { + schema: Vec, + rows: Vec, + column_indexes: HashMap<&'a str, usize>, +} + +impl TableData<'_> { + pub fn new(num_columns: usize, num_rows: usize) -> Self { + Self { + schema: Vec::with_capacity(num_columns), + rows: Vec::with_capacity(num_rows), + column_indexes: HashMap::with_capacity(num_columns), + } + } + + #[inline] + pub fn num_columns(&self) -> usize { + self.schema.len() + } + + #[inline] + pub fn num_rows(&self) -> usize { + self.rows.len() + } + + #[inline] + pub fn alloc_one_row(&self) -> Vec { + vec![Value { value_data: None }; self.num_columns()] + } + + #[inline] + pub fn add_row(&mut self, values: Vec) { + self.rows.push(Row { values }) + } + + pub fn into_schema_and_rows(self) -> (Vec, Vec) { + (self.schema, self.rows) + } +} + +pub struct MultiTableData<'a> { + table_data_map: HashMap<&'a str, TableData<'a>>, +} + +impl<'a> MultiTableData<'a> { + pub fn new() -> Self { + Self { + table_data_map: HashMap::new(), + } + } + + pub fn get_or_default_table_data( + &mut self, + table_name: &'a str, + num_columns: usize, + num_rows: usize, + ) -> &mut TableData<'a> { + self.table_data_map + .entry(table_name) + .or_insert_with(|| TableData::new(num_columns, num_rows)) + } + + /// Returns the request and number of rows in it. + pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) { + let mut total_rows = 0; + let inserts = self + .table_data_map + .into_iter() + .map(|(table_name, table_data)| { + total_rows += table_data.num_rows(); + let num_columns = table_data.num_columns(); + let (schema, mut rows) = table_data.into_schema_and_rows(); + for row in &mut rows { + if num_columns > row.values.len() { + row.values.resize(num_columns, Value { value_data: None }); + } + } + + RowInsertRequest { + table_name: table_name.to_string(), + rows: Some(Rows { schema, rows }), + ..Default::default() + } + }) + .collect::>(); + let row_insert_requests = RowInsertRequests { inserts }; + + (row_insert_requests, total_rows) + } +} + +pub fn write_tags<'a>( + table_data: &mut TableData<'a>, + kvs: impl Iterator, + one_row: &mut Vec, +) -> Result<()> { + let ktv_iter = kvs.map(|(k, v)| { + ( + k, + ColumnDataType::String, + ValueData::StringValue(v.to_string()), + ) + }); + write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row) +} + +pub fn write_fields<'a>( + table_data: &mut TableData<'a>, + fields: impl Iterator, + one_row: &mut Vec, +) -> Result<()> { + write_by_semantic_type(table_data, SemanticType::Field, fields, one_row) +} + +pub fn write_f64<'a>( + table_data: &mut TableData<'a>, + name: &'a str, + value: f64, + one_row: &mut Vec, +) -> Result<()> { + write_fields( + table_data, + std::iter::once((name, ColumnDataType::Float64, ValueData::F64Value(value))), + one_row, + ) +} + +fn write_by_semantic_type<'a>( + table_data: &mut TableData<'a>, + semantic_type: SemanticType, + ktv_iter: impl Iterator, + one_row: &mut Vec, +) -> Result<()> { + let TableData { + schema, + column_indexes, + .. + } = table_data; + + for (name, datatype, value) in ktv_iter { + let index = column_indexes.entry(name).or_insert(schema.len()); + if *index == schema.len() { + schema.push(ColumnSchema { + column_name: name.to_string(), + datatype: datatype as i32, + semantic_type: semantic_type as i32, + }); + one_row.push(value.into()); + } else { + check_schema(datatype, semantic_type, &schema[*index])?; + one_row[*index].value_data = Some(value); + } + } + + Ok(()) +} + +pub fn write_ts_millis<'a>( + table_data: &mut TableData<'a>, + name: &'a str, + ts: Option, + one_row: &mut Vec, +) -> Result<()> { + write_ts_precision(table_data, name, ts, Precision::Millisecond, one_row) +} + +pub fn write_ts_precision<'a>( + table_data: &mut TableData<'a>, + name: &'a str, + ts: Option, + precision: Precision, + one_row: &mut Vec, +) -> Result<()> { + let TableData { + schema, + column_indexes, + .. + } = table_data; + + let ts = match ts { + Some(timestamp) => writer::to_ms_ts(precision, timestamp), + None => { + let timestamp = Timestamp::current_millis(); + let unit: TimeUnit = precision.try_into().context(InfluxdbLinesWriteSnafu)?; + let timestamp = timestamp + .convert_to(unit) + .with_context(|| TimePrecisionSnafu { + name: precision.to_string(), + })?; + writer::to_ms_ts(precision, timestamp.into()) + } + }; + + let index = column_indexes.entry(name).or_insert(schema.len()); + if *index == schema.len() { + schema.push(ColumnSchema { + column_name: name.to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + }); + one_row.push(ValueData::TsMillisecondValue(ts).into()) + } else { + check_schema( + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + &schema[*index], + )?; + one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts)); + } + + Ok(()) +} + +#[inline] +fn check_schema( + datatype: ColumnDataType, + semantic_type: SemanticType, + schema: &ColumnSchema, +) -> Result<()> { + ensure!( + schema.datatype == datatype as i32, + IncompatibleSchemaSnafu { + column_name: &schema.column_name, + datatype: "datatype", + expected: schema.datatype, + actual: datatype as i32, + } + ); + + ensure!( + schema.semantic_type == semantic_type as i32, + IncompatibleSchemaSnafu { + column_name: &schema.column_name, + datatype: "semantic_type", + expected: schema.semantic_type, + actual: semantic_type as i32, + } + ); + + Ok(()) +}