Skip to content

Commit

Permalink
feat: prometheus row inserter (GreptimeTeam#2263)
Browse files Browse the repository at this point in the history
* feat: prometheus row inserter

* chore: add unit test

* refactor: to row_insert_requests

* chore: typo

* chore: alloc row by TableData

* chore: by review comment
  • Loading branch information
fengjiachun authored and paomian committed Oct 19, 2023
1 parent 8d0c33a commit 4653660
Show file tree
Hide file tree
Showing 9 changed files with 534 additions and 215 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ common-error = { workspace = true }
common-recordbatch = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
common-time = { workspace = true }
dashmap = "5.4"
datafusion.workspace = true
datatypes = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion src/common/grpc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub enum Error {
location: Location,
source: datatypes::error::Error,
},

#[snafu(display("Not supported: {}", feat))]
NotSupported { feat: String },
}

impl ErrorExt for Error {
Expand All @@ -83,7 +86,8 @@ impl ErrorExt for Error {
Error::InvalidTlsConfig { .. }
| Error::InvalidConfigFilePath { .. }
| Error::TypeMismatch { .. }
| Error::InvalidFlightData { .. } => StatusCode::InvalidArguments,
| Error::InvalidFlightData { .. }
| Error::NotSupported { .. } => StatusCode::InvalidArguments,

Error::CreateChannel { .. }
| Error::Conversion { .. }
Expand Down
20 changes: 20 additions & 0 deletions src/common/grpc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use std::fmt::Display;
use api::helper::values_with_capacity;
use api::v1::{Column, ColumnDataType, SemanticType};
use common_base::BitVec;
use common_time::timestamp::TimeUnit;
use snafu::ensure;

use crate::error::{Result, TypeMismatchSnafu};
use crate::Error;

type ColumnName = String;

Expand Down Expand Up @@ -259,6 +261,24 @@ impl Display for Precision {
}
}

impl TryFrom<Precision> for TimeUnit {
type Error = Error;

fn try_from(precision: Precision) -> std::result::Result<Self, Self::Error> {
Ok(match precision {
Precision::Second => TimeUnit::Second,
Precision::Millisecond => TimeUnit::Millisecond,
Precision::Microsecond => TimeUnit::Microsecond,
Precision::Nanosecond => TimeUnit::Nanosecond,
_ => {
return Err(Error::NotSupported {
feat: format!("convert {precision} into TimeUnit"),
})
}
})
}
}

#[cfg(test)]
mod tests {
use api::v1::{ColumnDataType, SemanticType};
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ impl PromStoreProtocolHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;
let (requests, samples) = prom_store::to_grpc_insert_requests(request)?;
let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?;
let _ = self
.handle_inserts(requests, ctx)
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
Expand Down
252 changes: 41 additions & 211 deletions src/servers/src/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,16 @@ use std::collections::HashMap;

use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, InsertRequest as GrpcInsertRequest, InsertRequests, Row,
RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests, RowInsertRequests,
};
use common_grpc::writer;
use common_grpc::writer::{LinesWriter, Precision};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use influxdb_line_protocol::{parse_lines, FieldSet, FieldValue, TagSet};
use snafu::{ensure, OptionExt, ResultExt};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::{OptionExt, ResultExt};

use crate::error::{
Error, IncompatibleSchemaSnafu, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu,
TimePrecisionSnafu,
};
use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, TimePrecisionSnafu};
use crate::row_writer::{self, MultiTableData};

pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
Expand Down Expand Up @@ -107,7 +103,7 @@ impl TryFrom<InfluxdbRequest> for InsertRequests {
} else {
let precision = unwrap_or_default_precision(value.precision);
let timestamp = Timestamp::current_millis();
let unit = get_time_unit(precision)?;
let unit: TimeUnit = precision.try_into().context(InfluxdbLinesWriteSnafu)?;
let timestamp = timestamp
.convert_to(unit)
.with_context(|| TimePrecisionSnafu {
Expand Down Expand Up @@ -147,13 +143,7 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
.context(InfluxdbLineProtocolSnafu)?;

struct TableData<'a> {
schema: Vec<ColumnSchema>,
rows: Vec<Row>,
column_indexes: HashMap<&'a str, usize>,
}

let mut table_data_map = HashMap::new();
let mut multi_table_data = MultiTableData::new();

for line in &lines {
let table_name = line.series.measurement.as_str();
Expand All @@ -163,192 +153,46 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {
// tags.len + fields.len + timestamp(+1)
let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1;

let TableData {
schema,
rows,
column_indexes,
} = table_data_map
.entry(table_name)
.or_insert_with(|| TableData {
schema: Vec::with_capacity(num_columns),
rows: Vec::new(),
column_indexes: HashMap::with_capacity(num_columns),
});

let mut one_row = vec![Value { value_data: None }; schema.len()];
let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0);
let mut one_row = table_data.alloc_one_row();

// tags
parse_tags(tags, column_indexes, schema, &mut one_row)?;
// fields
parse_fields(fields, column_indexes, schema, &mut one_row)?;
// timestamp
parse_ts(ts, value.precision, column_indexes, schema, &mut one_row)?;

rows.push(Row { values: one_row });
}

let inserts = table_data_map
.into_iter()
.map(
|(
table_name,
TableData {
schema, mut rows, ..
},
)| {
let num_columns = schema.len();
for row in rows.iter_mut() {
if num_columns > row.values.len() {
row.values.resize(num_columns, Value { value_data: None });
}
}

RowInsertRequest {
table_name: table_name.to_string(),
rows: Some(Rows { schema, rows }),
..Default::default()
}
},
)
.collect::<Vec<_>>();

Ok(RowInsertRequests { inserts })
}
}
if let Some(tags) = tags {
let kvs = tags.iter().map(|(k, v)| (k.as_str(), v.as_str()));
row_writer::write_tags(table_data, kvs, &mut one_row)?;
}

fn parse_tags<'a>(
tags: &'a Option<TagSet>,
column_indexes: &mut HashMap<&'a str, usize>,
schema: &mut Vec<ColumnSchema>,
one_row: &mut Vec<Value>,
) -> Result<(), Error> {
let Some(tags) = tags else {
return Ok(());
};

for (k, v) in tags {
let index = column_indexes.entry(k.as_str()).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: k.to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
// fields
let fields = fields.iter().map(|(k, v)| {
let (datatype, value) = match v {
FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)),
FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)),
FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)),
FieldValue::String(v) => (
ColumnDataType::String,
ValueData::StringValue(v.to_string()),
),
FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)),
};
(k.as_str(), datatype, value)
});
one_row.push(ValueData::StringValue(v.to_string()).into());
} else {
check_schema(ColumnDataType::String, SemanticType::Tag, &schema[*index])?;
one_row[*index].value_data = Some(ValueData::StringValue(v.to_string()));
}
}

Ok(())
}

fn parse_fields<'a>(
fields: &'a FieldSet,
column_indexes: &mut HashMap<&'a str, usize>,
schema: &mut Vec<ColumnSchema>,
one_row: &mut Vec<Value>,
) -> Result<(), Error> {
for (k, v) in fields {
let index = column_indexes.entry(k.as_str()).or_insert(schema.len());
let (datatype, value) = match v {
FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)),
FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)),
FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)),
FieldValue::String(v) => (
ColumnDataType::String,
ValueData::StringValue(v.to_string()),
),
FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)),
};
row_writer::write_fields(table_data, fields, &mut one_row)?;

if *index == schema.len() {
schema.push(ColumnSchema {
column_name: k.to_string(),
datatype: datatype as i32,
semantic_type: SemanticType::Field as i32,
});
one_row.push(value.into());
} else {
check_schema(datatype, SemanticType::Field, &schema[*index])?;
one_row[*index].value_data = Some(value);
// timestamp
let precision = unwrap_or_default_precision(value.precision);
row_writer::write_ts_precision(
table_data,
INFLUXDB_TIMESTAMP_COLUMN_NAME,
ts,
precision,
&mut one_row,
)?;

table_data.add_row(one_row);
}
}

Ok(())
}

fn parse_ts(
ts: Option<i64>,
precision: Option<Precision>,
column_indexes: &mut HashMap<&str, usize>,
schema: &mut Vec<ColumnSchema>,
one_row: &mut Vec<Value>,
) -> Result<(), Error> {
let precision = unwrap_or_default_precision(precision);
let ts = match ts {
Some(timestamp) => writer::to_ms_ts(precision, timestamp),
None => {
let timestamp = Timestamp::current_millis();
let unit = get_time_unit(precision)?;
let timestamp = timestamp
.convert_to(unit)
.with_context(|| TimePrecisionSnafu {
name: precision.to_string(),
})?;
writer::to_ms_ts(precision, timestamp.into())
}
};

let column_name = INFLUXDB_TIMESTAMP_COLUMN_NAME;
let index = column_indexes.entry(column_name).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: column_name.to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
});
one_row.push(ValueData::TsMillisecondValue(ts).into())
} else {
check_schema(
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
&schema[*index],
)?;
one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts));
Ok(multi_table_data.into_row_insert_requests().0)
}

Ok(())
}

#[inline]
fn check_schema(
datatype: ColumnDataType,
semantic_type: SemanticType,
schema: &ColumnSchema,
) -> Result<(), Error> {
ensure!(
schema.datatype == datatype as i32,
IncompatibleSchemaSnafu {
column_name: &schema.column_name,
datatype: "datatype",
expected: schema.datatype,
actual: datatype as i32,
}
);

ensure!(
schema.semantic_type == semantic_type as i32,
IncompatibleSchemaSnafu {
column_name: &schema.column_name,
datatype: "semantic_type",
expected: schema.semantic_type,
actual: semantic_type as i32,
}
);

Ok(())
}

#[inline]
Expand All @@ -360,25 +204,11 @@ fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
}
}

#[inline]
fn get_time_unit(precision: Precision) -> Result<TimeUnit, Error> {
Ok(match precision {
Precision::Second => TimeUnit::Second,
Precision::Millisecond => TimeUnit::Millisecond,
Precision::Microsecond => TimeUnit::Microsecond,
Precision::Nanosecond => TimeUnit::Nanosecond,
_ => {
return Err(Error::NotSupported {
feat: format!("convert {precision} into TimeUnit"),
})
}
})
}

#[cfg(test)]
mod tests {
use api::v1::column::Values;
use api::v1::{Column, ColumnDataType, SemanticType};
use api::v1::value::ValueData;
use api::v1::{Column, ColumnDataType, Rows, SemanticType};
use common_base::BitVec;

use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub mod postgres;
pub mod prom_store;
pub mod prometheus_handler;
pub mod query_handler;
mod row_writer;
pub mod server;
mod shutdown;
pub mod tls;
Expand Down
Loading

0 comments on commit 4653660

Please sign in to comment.